(ctx context.Context)
| 82 | } |
| 83 | |
| 84 | func (r *PartitionReader) running(ctx context.Context) error { |
| 85 | offset, err := r.fetchLastCommittedOffsetWithRetries(ctx) |
| 86 | if err != nil { |
| 87 | // Shutdown can cancel the reader before the initial offset lookup completes. |
| 88 | // Treat that as a clean stop instead of failing the service. |
| 89 | if ctx.Err() != nil { |
| 90 | return nil |
| 91 | } |
| 92 | return fmt.Errorf("failed to fetch last committed offset: %w", err) |
| 93 | } |
| 94 | |
| 95 | // Fetch end offset to calculate initial lag before entering poll loop. |
| 96 | // Ensures we have data even if PollFetches blocks due to empty partition |
| 97 | if endOffsets, err := r.adm.ListEndOffsets(ctx, r.topic); err == nil { |
| 98 | if endOffset, found := endOffsets.Lookup(r.topic, r.partitionID); found && endOffset.Err == nil { |
| 99 | // Get the committed offset value |
| 100 | committedAt := offset.EpochOffset().Offset |
| 101 | if committedAt >= 0 { |
| 102 | lag := endOffset.Offset - committedAt |
| 103 | if lag < 0 { |
| 104 | lag = 0 |
| 105 | } |
| 106 | r.lag.Store(lag) |
| 107 | level.Debug(r.logger).Log("msg", "initial lag calculated", "lag", lag, "committed", committedAt, "end", endOffset.Offset) |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{r.topic: {r.partitionID: offset}}) |
| 113 | defer r.client.RemoveConsumePartitions(map[string][]int32{r.topic: {r.partitionID}}) |
| 114 | |
| 115 | r.wg.Go(func() { r.commitLoop(ctx) }) |
| 116 | r.metrics.ownedPartition.WithLabelValues(strconv.Itoa(int(r.partitionID)), r.consumerGroup).Set(1) |
| 117 | |
| 118 | for ctx.Err() == nil { |
| 119 | fetches := r.client.PollFetches(ctx) |
| 120 | if fetches.Err() != nil { |
| 121 | if errors.Is(fetches.Err(), context.Canceled) { |
| 122 | return nil |
| 123 | } |
| 124 | err := collectFetchErrs(fetches) |
| 125 | level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err) |
| 126 | continue |
| 127 | } |
| 128 | |
| 129 | r.recordFetchesMetrics(fetches) |
| 130 | offset, consumptionErr := r.consume(ctx, fetches.RecordIter(), time.Now()) |
| 131 | if consumptionErr != nil { |
| 132 | // TODO abort ingesting & back off if it's a server error, ignore error if it's a client error |
| 133 | level.Error(r.logger).Log("msg", "encountered error processing records; skipping", "err", consumptionErr) |
| 134 | } |
| 135 | if offset != nil { |
| 136 | r.storeOffsetForCommit(ctx, offset) |
| 137 | } |
| 138 | |
| 139 | // Calculate lag as the difference between the high watermark and |
| 140 | // the last successfully processed (committed) offset. |
| 141 | for _, fetch := range fetches { |
nothing calls this directly
no test coverage detected