(memberID string)
| 1203 | } |
| 1204 | |
| 1205 | func (cg *ConsumerGroup) leaveGroup(memberID string) error { |
| 1206 | // don't attempt to leave the group if no memberID was ever assigned. |
| 1207 | if memberID == "" { |
| 1208 | return nil |
| 1209 | } |
| 1210 | |
| 1211 | cg.withLogger(func(log Logger) { |
| 1212 | log.Printf("Leaving group %s, member %s", cg.config.ID, memberID) |
| 1213 | }) |
| 1214 | |
| 1215 | // IMPORTANT : leaveGroup establishes its own connection to the coordinator |
| 1216 | // because it is often called after some other operation failed. |
| 1217 | // said failure could be the result of connection-level issues, |
| 1218 | // so we want to re-establish the connection to ensure that we |
| 1219 | // are able to process the cleanup step. |
| 1220 | coordinator, err := cg.coordinator() |
| 1221 | if err != nil { |
| 1222 | return err |
| 1223 | } |
| 1224 | |
| 1225 | _, err = coordinator.leaveGroup(leaveGroupRequestV0{ |
| 1226 | GroupID: cg.config.ID, |
| 1227 | MemberID: memberID, |
| 1228 | }) |
| 1229 | if err != nil { |
| 1230 | cg.withErrorLogger(func(log Logger) { |
| 1231 | log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err) |
| 1232 | }) |
| 1233 | } |
| 1234 | |
| 1235 | _ = coordinator.Close() |
| 1236 | |
| 1237 | return err |
| 1238 | } |
| 1239 | |
| 1240 | func (cg *ConsumerGroup) withLogger(do func(Logger)) { |
| 1241 | if cg.config.Logger != nil { |
no test coverage detected