MCPcopy
hub / github.com/grafana/tempo / LeaveConsumerGroupByInstanceID

Function LeaveConsumerGroupByInstanceID

pkg/ingest/consumer_group.go:27–61  ·  view source on GitHub ↗

LeaveConsumerGroupByInstanceID sends a LeaveGroup request for the given instance ID so the coordinator can rebalance without waiting for session timeout. Use this on shutdown when using static membership (InstanceID): franz-go does not send LeaveGroup on Close() when InstanceID is set. Requires Kaf

(ctx context.Context, client *kgo.Client, group, instanceID string, logger log.Logger)

Source from the content-addressed store, hash-verified

25//
26// No-op if instanceID is empty.
27func LeaveConsumerGroupByInstanceID(ctx context.Context, client *kgo.Client, group, instanceID string, logger log.Logger) error {
28 if instanceID == "" {
29 return nil
30 }
31 req := kmsg.NewPtrLeaveGroupRequest()
32 req.Group = group
33 member := kmsg.NewLeaveGroupRequestMember()
34 member.InstanceID = &instanceID
35 req.Members = append(req.Members, member)
36 resp, err := req.RequestWith(ctx, client)
37 if err != nil {
38 return err
39 }
40 if err := kerr.ErrorForCode(resp.ErrorCode); err != nil {
41 return err
42 }
43 // v3+ responses carry per-member error codes; check those too.
44 var memberErr error
45 for _, m := range resp.Members {
46 if m.ErrorCode == 0 {
47 continue
48 }
49 if m.InstanceID != nil && *m.InstanceID == instanceID {
50 return kerr.ErrorForCode(m.ErrorCode)
51 }
52 if memberErr == nil {
53 memberErr = kerr.ErrorForCode(m.ErrorCode)
54 }
55 }
56 if memberErr != nil {
57 return memberErr
58 }
59 level.Info(logger).Log("msg", "left Kafka consumer group by instance ID", "group", group, "instance_id", instanceID)
60 return nil
61}

Calls 1

LogMethod · 0.65