MCPcopy
hub / github.com/grafana/tempo / getGroupLag

Function getGroupLag

pkg/ingest/metrics.go:122–148  ·  view source on GitHub ↗

getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. The lag is the difference between the last produced offset (high watermark) and an

(ctx context.Context, admClient *kadm.Client, partitionClient *PartitionOffsetClient, group string, assignedPartitions []int32)

Source from the content-addressed store, hash-verified

120// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
121// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
122func getGroupLag(ctx context.Context, admClient *kadm.Client, partitionClient *PartitionOffsetClient, group string, assignedPartitions []int32) (kadm.GroupLag, error) {
123 offsets, err := admClient.FetchOffsets(ctx, group)
124 if err != nil {
125 if !errors.Is(err, kerr.GroupIDNotFound) {
126 return nil, fmt.Errorf("fetch offsets: %w", err)
127 }
128 }
129 if err := offsets.Error(); err != nil {
130 return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
131 }
132
133 startOffsets, err := partitionClient.FetchPartitionsStartProducedOffsets(ctx, assignedPartitions)
134 if err != nil {
135 return nil, err
136 }
137 endOffsets, err := partitionClient.FetchPartitionsLastProducedOffsets(ctx, assignedPartitions)
138 if err != nil {
139 return nil, err
140 }
141
142 descrGroup := kadm.DescribedGroup{
143 // "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
144 // because we don't use group consumption.
145 State: "Empty",
146 }
147 return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
148}

Callers 1

Tested by

no test coverage detected