MCPcopy
hub / github.com/grafana/tempo / ExportPartitionLagMetrics

Function ExportPartitionLagMetrics

pkg/ingest/metrics.go:47–95  ·  view source on GitHub ↗

ExportPartitionLagMetrics in a background goroutine by periodically querying Kafka state for the assigned and active partitions. This exports the lag metric in number of records which is different than the lag metric for age. Call ResetLagMetricsForRevokedPartitions when partitions are revoked to p

(ctx context.Context, kclient *kgo.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func())

Source from the content-addressed store, hash-verified

45// stale data. For efficiency this is not detected automatically from changes inthe assigned
46// partition callback.
47func ExportPartitionLagMetrics(ctx context.Context, kclient *kgo.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func()) {
48 go func() {
49 var (
50 waitTime = cfg.Kafka.ConsumerGroupLagMetricUpdateInterval
51 topic = cfg.Kafka.Topic
52 group = cfg.Kafka.ConsumerGroup
53 boff = backoff.New(ctx, backoff.Config{
54 MinBackoff: 100 * time.Millisecond,
55 MaxBackoff: waitTime,
56 MaxRetries: 5,
57 })
58 admClient = kadm.NewClient(kclient)
59 partitionClient = NewPartitionOffsetClient(kclient, topic)
60 )
61
62 for {
63 select {
64 case <-time.After(waitTime):
65 var (
66 lag kadm.GroupLag
67 err error
68 )
69 assignedPartitions := getAssignedActivePartitions()
70 boff.Reset()
71 for boff.Ongoing() {
72 lag, err = getGroupLag(ctx, admClient, partitionClient, group, assignedPartitions)
73 if err == nil {
74 break
75 }
76 HandleKafkaError(err, forceMetadataRefresh)
77 boff.Wait()
78 }
79
80 if err != nil {
81 level.Error(log).Log("msg", "metric lag failed:", "err", err, "retries", boff.NumRetries())
82 continue
83 }
84 for _, p := range assignedPartitions {
85 l, ok := lag.Lookup(topic, p)
86 if ok {
87 metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag))
88 }
89 }
90 case <-ctx.Done():
91 return
92 }
93 }
94 }()
95}
96
97// SetPartitionLagSeconds is similar to the auto exported lag, except it is in real clock seconds
98// which can only be known after the record is read from the queue, therefore it is set by the caller.

Callers 3

startingMethod · 0.92
startKafkaMethod · 0.92
startKafkaIngestPathMethod · 0.92

Calls 9

NewPartitionOffsetClientFunction · 0.85
getGroupLagFunction · 0.85
HandleKafkaErrorFunction · 0.85
ResetMethod · 0.65
WaitMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65
SetMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected