| 128 | func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } |
| 129 | |
| 130 | func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { |
| 131 | for { |
| 132 | select { |
| 133 | case message, ok := <-claim.Messages(): |
| 134 | if !ok { |
| 135 | return nil |
| 136 | } |
| 137 | c.inFlight.Add(1) |
| 138 | log.Printf("Message claimed: topic=%s partition=%d offset=%d", message.Topic, message.Partition, message.Offset) |
| 139 | session.MarkMessage(message, "") |
| 140 | c.inFlight.Add(-1) |
| 141 | case <-session.Context().Done(): |
| 142 | return nil |
| 143 | } |
| 144 | } |
| 145 | } |