(offsetsByPartition map[topicPartition]int64)
| 1179 | } |
| 1180 | |
| 1181 | func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { |
| 1182 | if r.closed { |
| 1183 | // don't start child reader if parent Reader is closed |
| 1184 | return |
| 1185 | } |
| 1186 | |
| 1187 | ctx, cancel := context.WithCancel(context.Background()) |
| 1188 | |
| 1189 | r.cancel() // always cancel the previous reader |
| 1190 | r.cancel = cancel |
| 1191 | r.version++ |
| 1192 | |
| 1193 | r.join.Add(len(offsetsByPartition)) |
| 1194 | for key, offset := range offsetsByPartition { |
| 1195 | go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) { |
| 1196 | defer join.Done() |
| 1197 | |
| 1198 | (&reader{ |
| 1199 | dialer: r.config.Dialer, |
| 1200 | logger: r.config.Logger, |
| 1201 | errorLogger: r.config.ErrorLogger, |
| 1202 | brokers: r.config.Brokers, |
| 1203 | topic: key.topic, |
| 1204 | partition: int(key.partition), |
| 1205 | minBytes: r.config.MinBytes, |
| 1206 | maxBytes: r.config.MaxBytes, |
| 1207 | maxWait: r.config.MaxWait, |
| 1208 | readBatchTimeout: r.config.ReadBatchTimeout, |
| 1209 | backoffDelayMin: r.config.ReadBackoffMin, |
| 1210 | backoffDelayMax: r.config.ReadBackoffMax, |
| 1211 | version: r.version, |
| 1212 | msgs: r.msgs, |
| 1213 | stats: r.stats, |
| 1214 | isolationLevel: r.config.IsolationLevel, |
| 1215 | maxAttempts: r.config.MaxAttempts, |
| 1216 | |
| 1217 | // backwards-compatibility flags |
| 1218 | offsetOutOfRangeError: r.config.OffsetOutOfRangeError, |
| 1219 | }).run(ctx, offset) |
| 1220 | }(ctx, key, offset, &r.join) |
| 1221 | } |
| 1222 | } |
| 1223 | |
| 1224 | // A reader reads messages from kafka and produces them on its channels, it's |
| 1225 | // used as a way to asynchronously fetch messages while the main program reads |
no test coverage detected