MCPcopy
hub / github.com/segmentio/kafka-go / start

Method start

reader.go:1181–1222  ·  view source on GitHub ↗
(offsetsByPartition map[topicPartition]int64)

Source from the content-addressed store, hash-verified

1179}
1180
1181func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
1182 if r.closed {
1183 // don't start child reader if parent Reader is closed
1184 return
1185 }
1186
1187 ctx, cancel := context.WithCancel(context.Background())
1188
1189 r.cancel() // always cancel the previous reader
1190 r.cancel = cancel
1191 r.version++
1192
1193 r.join.Add(len(offsetsByPartition))
1194 for key, offset := range offsetsByPartition {
1195 go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
1196 defer join.Done()
1197
1198 (&reader{
1199 dialer: r.config.Dialer,
1200 logger: r.config.Logger,
1201 errorLogger: r.config.ErrorLogger,
1202 brokers: r.config.Brokers,
1203 topic: key.topic,
1204 partition: int(key.partition),
1205 minBytes: r.config.MinBytes,
1206 maxBytes: r.config.MaxBytes,
1207 maxWait: r.config.MaxWait,
1208 readBatchTimeout: r.config.ReadBatchTimeout,
1209 backoffDelayMin: r.config.ReadBackoffMin,
1210 backoffDelayMax: r.config.ReadBackoffMax,
1211 version: r.version,
1212 msgs: r.msgs,
1213 stats: r.stats,
1214 isolationLevel: r.config.IsolationLevel,
1215 maxAttempts: r.config.MaxAttempts,
1216
1217 // backwards-compatibility flags
1218 offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
1219 }).run(ctx, offset)
1220 }(ctx, key, offset, &r.join)
1221 }
1222}
1223
1224// A reader reads messages from kafka and produces them on its channels, it's
1225// used as a way to asynchronously fetch messages while the main program reads

Callers 3

subscribeMethod · 0.95
FetchMessageMethod · 0.95
SetOffsetMethod · 0.95

Calls 2

DoneMethod · 0.80
runMethod · 0.45

Tested by

no test coverage detected