MCPcopy
hub / github.com/IBM/sarama / consume

Method consume

consumer_group.go:1052–1094  ·  view source on GitHub ↗
(topic string, partition int32)

Source from the content-addressed store, hash-verified

1050}
1051
1052func (s *consumerGroupSession) consume(topic string, partition int32) {
1053 // quick exit if rebalance is due
1054 select {
1055 case <-s.ctx.Done():
1056 return
1057 case <-s.parent.closed:
1058 return
1059 default:
1060 }
1061
1062 // get next offset
1063 offset := s.parent.config.Consumer.Offsets.Initial
1064 if pom := s.offsets.findPOM(topic, partition); pom != nil {
1065 offset, _ = pom.NextOffset()
1066 }
1067
1068 // create new claim
1069 claim, err := s.newClaimWithRetry(topic, partition, offset)
1070 if err != nil {
1071 s.parent.handleError(err, topic, partition)
1072 return
1073 }
1074
1075 // trigger close when session is done
1076 go func() {
1077 select {
1078 case <-s.ctx.Done():
1079 case <-s.parent.closed:
1080 }
1081 claim.AsyncClose()
1082 }()
1083
1084 // start processing
1085 if err := s.handler.ConsumeClaim(s, claim); err != nil {
1086 s.parent.handleError(err, topic, partition)
1087 }
1088
1089 // ensure consumer is closed & drained
1090 claim.AsyncClose()
1091 for _, err := range claim.waitClosed() {
1092 s.parent.handleError(err, topic, partition)
1093 }
1094}
1095
1096func (s *consumerGroupSession) release(withCleanup bool) (err error) {
1097 // signal release, stop heartbeat

Callers 1

newConsumerGroupSessionFunction · 0.95

Calls 8

newClaimWithRetryMethod · 0.95
findPOMMethod · 0.80
waitClosedMethod · 0.80
DoneMethod · 0.65
NextOffsetMethod · 0.65
AsyncCloseMethod · 0.65
ConsumeClaimMethod · 0.65
handleErrorMethod · 0.45

Tested by

no test coverage detected