MCPcopy
hub / github.com/segmentio/kafka-go / LeaveGroup

Method LeaveGroup

leavegroup.go:64–111  ·  view source on GitHub ↗
(ctx context.Context, req *LeaveGroupRequest)

Source from the content-addressed store, hash-verified

62}
63
64func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error) {
65 leaveGroup := leavegroup.Request{
66 GroupID: req.GroupID,
67 Members: make([]leavegroup.RequestMember, 0, len(req.Members)),
68 }
69
70 for _, member := range req.Members {
71 leaveGroup.Members = append(leaveGroup.Members, leavegroup.RequestMember{
72 MemberID: member.ID,
73 GroupInstanceID: member.GroupInstanceID,
74 })
75 }
76
77 m, err := c.roundTrip(ctx, req.Addr, &leaveGroup)
78 if err != nil {
79 return nil, fmt.Errorf("kafka.(*Client).LeaveGroup: %w", err)
80 }
81
82 r := m.(*leavegroup.Response)
83
84 res := &LeaveGroupResponse{
85 Error: makeError(r.ErrorCode, ""),
86 Throttle: makeDuration(r.ThrottleTimeMS),
87 }
88
89 if len(r.Members) == 0 {
90 // If we're using a version of the api without the
91 // members array in the response, just add a member
92 // so the api is consistent across versions.
93 r.Members = []leavegroup.ResponseMember{
94 {
95 MemberID: req.Members[0].ID,
96 GroupInstanceID: req.Members[0].GroupInstanceID,
97 },
98 }
99 }
100
101 res.Members = make([]LeaveGroupResponseMember, 0, len(r.Members))
102 for _, member := range r.Members {
103 res.Members = append(res.Members, LeaveGroupResponseMember{
104 ID: member.MemberID,
105 GroupInstanceID: member.GroupInstanceID,
106 Error: makeError(member.ErrorCode, ""),
107 })
108 }
109
110 return res, nil
111}
112
113type leaveGroupRequestV0 struct {
114 // GroupID holds the unique group identifier

Callers 1

TestClientLeaveGroupFunction · 0.80

Calls 3

roundTripMethod · 0.95
makeErrorFunction · 0.85
makeDurationFunction · 0.85

Tested by 1

TestClientLeaveGroupFunction · 0.64