MCPcopy
hub / github.com/IBM/sarama / newConsumerGroupClaim

Function newConsumerGroupClaim

consumer_group.go:1281–1304  ·  view source on GitHub ↗
(sess *consumerGroupSession, topic string, partition int32, offset int64)

Source from the content-addressed store, hash-verified

1279}
1280
1281func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
1282 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
1283
1284 if errors.Is(err, ErrOffsetOutOfRange) && sess.parent.config.Consumer.Group.ResetInvalidOffsets {
1285 offset = sess.parent.config.Consumer.Offsets.Initial
1286 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
1287 }
1288 if err != nil {
1289 return nil, err
1290 }
1291
1292 go func() {
1293 for err := range pcm.Errors() {
1294 sess.parent.handleError(err, topic, partition)
1295 }
1296 }()
1297
1298 return &consumerGroupClaim{
1299 topic: topic,
1300 partition: partition,
1301 offset: offset,
1302 PartitionConsumer: pcm,
1303 }, nil
1304}
1305
1306func (c *consumerGroupClaim) Topic() string { return c.topic }
1307func (c *consumerGroupClaim) Partition() int32 { return c.partition }

Callers 1

newClaimWithRetryMethod · 0.85

Calls 4

IsMethod · 0.80
ConsumePartitionMethod · 0.65
ErrorsMethod · 0.65
handleErrorMethod · 0.45

Tested by

no test coverage detected