MCPcopy
hub / github.com/grafana/tempo / running

Method running

modules/livestore/partition_reader.go:84–165  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

82}
83
84func (r *PartitionReader) running(ctx context.Context) error {
85 offset, err := r.fetchLastCommittedOffsetWithRetries(ctx)
86 if err != nil {
87 // Shutdown can cancel the reader before the initial offset lookup completes.
88 // Treat that as a clean stop instead of failing the service.
89 if ctx.Err() != nil {
90 return nil
91 }
92 return fmt.Errorf("failed to fetch last committed offset: %w", err)
93 }
94
95 // Fetch end offset to calculate initial lag before entering poll loop.
96 // Ensures we have data even if PollFetches blocks due to empty partition
97 if endOffsets, err := r.adm.ListEndOffsets(ctx, r.topic); err == nil {
98 if endOffset, found := endOffsets.Lookup(r.topic, r.partitionID); found && endOffset.Err == nil {
99 // Get the committed offset value
100 committedAt := offset.EpochOffset().Offset
101 if committedAt >= 0 {
102 lag := endOffset.Offset - committedAt
103 if lag < 0 {
104 lag = 0
105 }
106 r.lag.Store(lag)
107 level.Debug(r.logger).Log("msg", "initial lag calculated", "lag", lag, "committed", committedAt, "end", endOffset.Offset)
108 }
109 }
110 }
111
112 r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{r.topic: {r.partitionID: offset}})
113 defer r.client.RemoveConsumePartitions(map[string][]int32{r.topic: {r.partitionID}})
114
115 r.wg.Go(func() { r.commitLoop(ctx) })
116 r.metrics.ownedPartition.WithLabelValues(strconv.Itoa(int(r.partitionID)), r.consumerGroup).Set(1)
117
118 for ctx.Err() == nil {
119 fetches := r.client.PollFetches(ctx)
120 if fetches.Err() != nil {
121 if errors.Is(fetches.Err(), context.Canceled) {
122 return nil
123 }
124 err := collectFetchErrs(fetches)
125 level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err)
126 continue
127 }
128
129 r.recordFetchesMetrics(fetches)
130 offset, consumptionErr := r.consume(ctx, fetches.RecordIter(), time.Now())
131 if consumptionErr != nil {
132 // TODO abort ingesting & back off if it's a server error, ignore error if it's a client error
133 level.Error(r.logger).Log("msg", "encountered error processing records; skipping", "err", consumptionErr)
134 }
135 if offset != nil {
136 r.storeOffsetForCommit(ctx, offset)
137 }
138
139 // Calculate lag as the difference between the high watermark and
140 // the last successfully processed (committed) offset.
141 for _, fetch := range fetches {

Callers

nothing calls this directly

Calls 11

commitLoopMethod · 0.95
recordFetchesMetricsMethod · 0.95
storeOffsetForCommitMethod · 0.95
collectFetchErrsFunction · 0.85
StoreMethod · 0.65
LogMethod · 0.65
SetMethod · 0.65
ErrorMethod · 0.65
NowMethod · 0.65
consumeMethod · 0.45

Tested by

no test coverage detected