MCPcopy
hub / github.com/segmentio/kafka-go / run

Method run

reader.go:1256–1440  ·  view source on GitHub ↗
(ctx context.Context, offset int64)

Source from the content-addressed store, hash-verified

1254}
1255
1256func (r *reader) run(ctx context.Context, offset int64) {
1257 // This is the reader's main loop, it only ends if the context is canceled
1258 // and will keep attempting to reader messages otherwise.
1259 //
1260 // Retrying indefinitely has the nice side effect of preventing Read calls
1261 // on the parent reader to block if connection to the kafka server fails,
1262 // the reader keeps reporting errors on the error channel which will then
1263 // be surfaced to the program.
1264 // If the reader wasn't retrying then the program would block indefinitely
1265 // on a Read call after reading the first error.
1266 for attempt := 0; true; attempt++ {
1267 if attempt != 0 {
1268 if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
1269 return
1270 }
1271 }
1272
1273 r.withLogger(func(log Logger) {
1274 log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
1275 })
1276
1277 conn, start, err := r.initialize(ctx, offset)
1278 if err != nil {
1279 if errors.Is(err, OffsetOutOfRange) {
1280 if r.offsetOutOfRangeError {
1281 r.sendError(ctx, err)
1282 return
1283 }
1284
1285 // This would happen if the requested offset is passed the last
1286 // offset on the partition leader. In that case we're just going
1287 // to retry later hoping that enough data has been produced.
1288 r.withErrorLogger(func(log Logger) {
1289 log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
1290 })
1291
1292 continue
1293 }
1294
1295 // Perform a configured number of attempts before
1296 // reporting first errors, this helps mitigate
1297 // situations where the kafka server is temporarily
1298 // unavailable.
1299 if attempt >= r.maxAttempts {
1300 r.sendError(ctx, err)
1301 } else {
1302 r.stats.errors.observe(1)
1303 r.withErrorLogger(func(log Logger) {
1304 log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
1305 })
1306 }
1307 continue
1308 }
1309
1310 // Resetting the attempt counter ensures that if a failure occurs after
1311 // a successful initialization we don't keep increasing the backoff
1312 // timeout.
1313 attempt = 0

Callers

nothing calls this directly

Calls 12

withLoggerMethod · 0.95
initializeMethod · 0.95
sendErrorMethod · 0.95
withErrorLoggerMethod · 0.95
readMethod · 0.95
readOffsetsMethod · 0.95
sleepFunction · 0.85
backoffFunction · 0.85
toHumanOffsetFunction · 0.85
PrintfMethod · 0.65
observeMethod · 0.45
CloseMethod · 0.45

Tested by

no test coverage detected