MCPcopy
hub / github.com/IBM/sarama / release

Method release

consumer_group.go:1096–1125  ·  view source on GitHub ↗
(withCleanup bool)

Source from the content-addressed store, hash-verified

1094}
1095
1096func (s *consumerGroupSession) release(withCleanup bool) (err error) {
1097 // signal release, stop heartbeat
1098 s.cancel(nil)
1099
1100 // wait for consumers to exit
1101 s.waitGroup.Wait()
1102
1103 // perform release
1104 s.releaseOnce.Do(func() {
1105 if withCleanup {
1106 if e := s.handler.Cleanup(s); e != nil {
1107 s.parent.handleError(e, "", -1)
1108 err = e
1109 }
1110 }
1111
1112 if e := s.offsets.Close(); e != nil {
1113 err = e
1114 }
1115
1116 close(s.hbDying)
1117 <-s.hbDead
1118 })
1119
1120 Logger.Printf(
1121 "consumergroup/session/%s/%d released\n",
1122 s.MemberID(), s.GenerationID())
1123
1124 return
1125}
1126
1127func (s *consumerGroupSession) heartbeatLoop() {
1128 defer close(s.hbDead)

Callers 2

newConsumerGroupSessionFunction · 0.95
ConsumeMethod · 0.45

Calls 6

MemberIDMethod · 0.95
GenerationIDMethod · 0.95
CleanupMethod · 0.65
CloseMethod · 0.65
PrintfMethod · 0.65
handleErrorMethod · 0.45

Tested by

no test coverage detected