(ctx context.Context, offset int64)
| 1254 | } |
| 1255 | |
| 1256 | func (r *reader) run(ctx context.Context, offset int64) { |
| 1257 | // This is the reader's main loop, it only ends if the context is canceled |
| 1258 | // and will keep attempting to reader messages otherwise. |
| 1259 | // |
| 1260 | // Retrying indefinitely has the nice side effect of preventing Read calls |
| 1261 | // on the parent reader to block if connection to the kafka server fails, |
| 1262 | // the reader keeps reporting errors on the error channel which will then |
| 1263 | // be surfaced to the program. |
| 1264 | // If the reader wasn't retrying then the program would block indefinitely |
| 1265 | // on a Read call after reading the first error. |
| 1266 | for attempt := 0; true; attempt++ { |
| 1267 | if attempt != 0 { |
| 1268 | if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) { |
| 1269 | return |
| 1270 | } |
| 1271 | } |
| 1272 | |
| 1273 | r.withLogger(func(log Logger) { |
| 1274 | log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset)) |
| 1275 | }) |
| 1276 | |
| 1277 | conn, start, err := r.initialize(ctx, offset) |
| 1278 | if err != nil { |
| 1279 | if errors.Is(err, OffsetOutOfRange) { |
| 1280 | if r.offsetOutOfRangeError { |
| 1281 | r.sendError(ctx, err) |
| 1282 | return |
| 1283 | } |
| 1284 | |
| 1285 | // This would happen if the requested offset is passed the last |
| 1286 | // offset on the partition leader. In that case we're just going |
| 1287 | // to retry later hoping that enough data has been produced. |
| 1288 | r.withErrorLogger(func(log Logger) { |
| 1289 | log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err) |
| 1290 | }) |
| 1291 | |
| 1292 | continue |
| 1293 | } |
| 1294 | |
| 1295 | // Perform a configured number of attempts before |
| 1296 | // reporting first errors, this helps mitigate |
| 1297 | // situations where the kafka server is temporarily |
| 1298 | // unavailable. |
| 1299 | if attempt >= r.maxAttempts { |
| 1300 | r.sendError(ctx, err) |
| 1301 | } else { |
| 1302 | r.stats.errors.observe(1) |
| 1303 | r.withErrorLogger(func(log Logger) { |
| 1304 | log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err) |
| 1305 | }) |
| 1306 | } |
| 1307 | continue |
| 1308 | } |
| 1309 | |
| 1310 | // Resetting the attempt counter ensures that if a failure occurs after |
| 1311 | // a successful initialization we don't keep increasing the backoff |
| 1312 | // timeout. |
| 1313 | attempt = 0 |
nothing calls this directly
no test coverage detected