ExportPartitionLagMetrics in a background goroutine by periodically querying Kafka state for the assigned and active partitions. This exports the lag metric in number of records which is different than the lag metric for age. Call ResetLagMetricsForRevokedPartitions when partitions are revoked to p
(ctx context.Context, kclient *kgo.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func())
| 45 | // stale data. For efficiency this is not detected automatically from changes inthe assigned |
| 46 | // partition callback. |
| 47 | func ExportPartitionLagMetrics(ctx context.Context, kclient *kgo.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32, forceMetadataRefresh func()) { |
| 48 | go func() { |
| 49 | var ( |
| 50 | waitTime = cfg.Kafka.ConsumerGroupLagMetricUpdateInterval |
| 51 | topic = cfg.Kafka.Topic |
| 52 | group = cfg.Kafka.ConsumerGroup |
| 53 | boff = backoff.New(ctx, backoff.Config{ |
| 54 | MinBackoff: 100 * time.Millisecond, |
| 55 | MaxBackoff: waitTime, |
| 56 | MaxRetries: 5, |
| 57 | }) |
| 58 | admClient = kadm.NewClient(kclient) |
| 59 | partitionClient = NewPartitionOffsetClient(kclient, topic) |
| 60 | ) |
| 61 | |
| 62 | for { |
| 63 | select { |
| 64 | case <-time.After(waitTime): |
| 65 | var ( |
| 66 | lag kadm.GroupLag |
| 67 | err error |
| 68 | ) |
| 69 | assignedPartitions := getAssignedActivePartitions() |
| 70 | boff.Reset() |
| 71 | for boff.Ongoing() { |
| 72 | lag, err = getGroupLag(ctx, admClient, partitionClient, group, assignedPartitions) |
| 73 | if err == nil { |
| 74 | break |
| 75 | } |
| 76 | HandleKafkaError(err, forceMetadataRefresh) |
| 77 | boff.Wait() |
| 78 | } |
| 79 | |
| 80 | if err != nil { |
| 81 | level.Error(log).Log("msg", "metric lag failed:", "err", err, "retries", boff.NumRetries()) |
| 82 | continue |
| 83 | } |
| 84 | for _, p := range assignedPartitions { |
| 85 | l, ok := lag.Lookup(topic, p) |
| 86 | if ok { |
| 87 | metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag)) |
| 88 | } |
| 89 | } |
| 90 | case <-ctx.Done(): |
| 91 | return |
| 92 | } |
| 93 | } |
| 94 | }() |
| 95 | } |
| 96 | |
| 97 | // SetPartitionLagSeconds is similar to the auto exported lag, except it is in real clock seconds |
| 98 | // which can only be known after the record is read from the queue, therefore it is set by the caller. |
no test coverage detected