(withCleanup bool)
| 1094 | } |
| 1095 | |
| 1096 | func (s *consumerGroupSession) release(withCleanup bool) (err error) { |
| 1097 | // signal release, stop heartbeat |
| 1098 | s.cancel(nil) |
| 1099 | |
| 1100 | // wait for consumers to exit |
| 1101 | s.waitGroup.Wait() |
| 1102 | |
| 1103 | // perform release |
| 1104 | s.releaseOnce.Do(func() { |
| 1105 | if withCleanup { |
| 1106 | if e := s.handler.Cleanup(s); e != nil { |
| 1107 | s.parent.handleError(e, "", -1) |
| 1108 | err = e |
| 1109 | } |
| 1110 | } |
| 1111 | |
| 1112 | if e := s.offsets.Close(); e != nil { |
| 1113 | err = e |
| 1114 | } |
| 1115 | |
| 1116 | close(s.hbDying) |
| 1117 | <-s.hbDead |
| 1118 | }) |
| 1119 | |
| 1120 | Logger.Printf( |
| 1121 | "consumergroup/session/%s/%d released\n", |
| 1122 | s.MemberID(), s.GenerationID()) |
| 1123 | |
| 1124 | return |
| 1125 | } |
| 1126 | |
| 1127 | func (s *consumerGroupSession) heartbeatLoop() { |
| 1128 | defer close(s.hbDead) |
no test coverage detected