LeaveConsumerGroupByInstanceID sends a LeaveGroup request for the given instance ID so the coordinator can rebalance without waiting for session timeout. Use this on shutdown when using static membership (InstanceID): franz-go does not send LeaveGroup on Close() when InstanceID is set. Requires Kaf
(ctx context.Context, client *kgo.Client, group, instanceID string, logger log.Logger)
| 25 | // |
| 26 | // No-op if instanceID is empty. |
| 27 | func LeaveConsumerGroupByInstanceID(ctx context.Context, client *kgo.Client, group, instanceID string, logger log.Logger) error { |
| 28 | if instanceID == "" { |
| 29 | return nil |
| 30 | } |
| 31 | req := kmsg.NewPtrLeaveGroupRequest() |
| 32 | req.Group = group |
| 33 | member := kmsg.NewLeaveGroupRequestMember() |
| 34 | member.InstanceID = &instanceID |
| 35 | req.Members = append(req.Members, member) |
| 36 | resp, err := req.RequestWith(ctx, client) |
| 37 | if err != nil { |
| 38 | return err |
| 39 | } |
| 40 | if err := kerr.ErrorForCode(resp.ErrorCode); err != nil { |
| 41 | return err |
| 42 | } |
| 43 | // v3+ responses carry per-member error codes; check those too. |
| 44 | var memberErr error |
| 45 | for _, m := range resp.Members { |
| 46 | if m.ErrorCode == 0 { |
| 47 | continue |
| 48 | } |
| 49 | if m.InstanceID != nil && *m.InstanceID == instanceID { |
| 50 | return kerr.ErrorForCode(m.ErrorCode) |
| 51 | } |
| 52 | if memberErr == nil { |
| 53 | memberErr = kerr.ErrorForCode(m.ErrorCode) |
| 54 | } |
| 55 | } |
| 56 | if memberErr != nil { |
| 57 | return memberErr |
| 58 | } |
| 59 | level.Info(logger).Log("msg", "left Kafka consumer group by instance ID", "group", group, "instance_id", instanceID) |
| 60 | return nil |
| 61 | } |