MCPcopy
hub / github.com/segmentio/kafka-go / leaveGroup

Method leaveGroup

consumergroup.go:1205–1238  ·  view source on GitHub ↗
(memberID string)

Source from the content-addressed store, hash-verified

1203}
1204
1205func (cg *ConsumerGroup) leaveGroup(memberID string) error {
1206 // don't attempt to leave the group if no memberID was ever assigned.
1207 if memberID == "" {
1208 return nil
1209 }
1210
1211 cg.withLogger(func(log Logger) {
1212 log.Printf("Leaving group %s, member %s", cg.config.ID, memberID)
1213 })
1214
1215 // IMPORTANT : leaveGroup establishes its own connection to the coordinator
1216 // because it is often called after some other operation failed.
1217 // said failure could be the result of connection-level issues,
1218 // so we want to re-establish the connection to ensure that we
1219 // are able to process the cleanup step.
1220 coordinator, err := cg.coordinator()
1221 if err != nil {
1222 return err
1223 }
1224
1225 _, err = coordinator.leaveGroup(leaveGroupRequestV0{
1226 GroupID: cg.config.ID,
1227 MemberID: memberID,
1228 })
1229 if err != nil {
1230 cg.withErrorLogger(func(log Logger) {
1231 log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err)
1232 })
1233 }
1234
1235 _ = coordinator.Close()
1236
1237 return err
1238}
1239
1240func (cg *ConsumerGroup) withLogger(do func(Logger)) {
1241 if cg.config.Logger != nil {

Callers 1

runMethod · 0.95

Calls 6

withLoggerMethod · 0.95
coordinatorMethod · 0.95
withErrorLoggerMethod · 0.95
PrintfMethod · 0.65
leaveGroupMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected