(sess *consumerGroupSession, topic string, partition int32, offset int64)
| 1279 | } |
| 1280 | |
| 1281 | func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { |
| 1282 | pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) |
| 1283 | |
| 1284 | if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets { |
| 1285 | offset = sess.parent.config.Consumer.Offsets.Initial |
| 1286 | pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) |
| 1287 | } |
| 1288 | if err != nil { |
| 1289 | return nil, err |
| 1290 | } |
| 1291 | |
| 1292 | go func() { |
| 1293 | for err := range pcm.Errors() { |
| 1294 | sess.parent.handleError(err, topic, partition) |
| 1295 | } |
| 1296 | }() |
| 1297 | |
| 1298 | return &consumerGroupClaim{ |
| 1299 | topic: topic, |
| 1300 | partition: partition, |
| 1301 | offset: offset, |
| 1302 | PartitionConsumer: pcm, |
| 1303 | }, nil |
| 1304 | } |
| 1305 | |
| 1306 | func (c *consumerGroupClaim) Topic() string { return c.topic } |
| 1307 | func (c *consumerGroupClaim) Partition() int32 { return c.partition } |
no test coverage detected