(topic string, partition int32)
| 1050 | } |
| 1051 | |
| 1052 | func (s *consumerGroupSession) consume(topic string, partition int32) { |
| 1053 | // quick exit if rebalance is due |
| 1054 | select { |
| 1055 | case <-s.ctx.Done(): |
| 1056 | return |
| 1057 | case <-s.parent.closed: |
| 1058 | return |
| 1059 | default: |
| 1060 | } |
| 1061 | |
| 1062 | // get next offset |
| 1063 | offset := s.parent.config.Consumer.Offsets.Initial |
| 1064 | if pom := s.offsets.findPOM(topic, partition); pom != nil { |
| 1065 | offset, _ = pom.NextOffset() |
| 1066 | } |
| 1067 | |
| 1068 | // create new claim |
| 1069 | claim, err := s.newClaimWithRetry(topic, partition, offset) |
| 1070 | if err != nil { |
| 1071 | s.parent.handleError(err, topic, partition) |
| 1072 | return |
| 1073 | } |
| 1074 | |
| 1075 | // trigger close when session is done |
| 1076 | go func() { |
| 1077 | select { |
| 1078 | case <-s.ctx.Done(): |
| 1079 | case <-s.parent.closed: |
| 1080 | } |
| 1081 | claim.AsyncClose() |
| 1082 | }() |
| 1083 | |
| 1084 | // start processing |
| 1085 | if err := s.handler.ConsumeClaim(s, claim); err != nil { |
| 1086 | s.parent.handleError(err, topic, partition) |
| 1087 | } |
| 1088 | |
| 1089 | // ensure consumer is closed & drained |
| 1090 | claim.AsyncClose() |
| 1091 | for _, err := range claim.waitClosed() { |
| 1092 | s.parent.handleError(err, topic, partition) |
| 1093 | } |
| 1094 | } |
| 1095 | |
| 1096 | func (s *consumerGroupSession) release(withCleanup bool) (err error) { |
| 1097 | // signal release, stop heartbeat |
no test coverage detected