ConsumerOffsets returns a map[int]int64 of partition to committed offset for a consumer group id and topic. DEPRECATED: this method will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.
(ctx context.Context, tg TopicAndGroup)
| 63 | // DEPRECATED: this method will be removed in version 1.0, programs should |
| 64 | // migrate to use kafka.(*Client).OffsetFetch instead. |
| 65 | func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) { |
| 66 | metadata, err := c.Metadata(ctx, &MetadataRequest{ |
| 67 | Topics: []string{tg.Topic}, |
| 68 | }) |
| 69 | |
| 70 | if err != nil { |
| 71 | return nil, fmt.Errorf("failed to get topic metadata :%w", err) |
| 72 | } |
| 73 | |
| 74 | topic := metadata.Topics[0] |
| 75 | partitions := make([]int, len(topic.Partitions)) |
| 76 | |
| 77 | for i := range topic.Partitions { |
| 78 | partitions[i] = topic.Partitions[i].ID |
| 79 | } |
| 80 | |
| 81 | offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{ |
| 82 | GroupID: tg.GroupId, |
| 83 | Topics: map[string][]int{ |
| 84 | tg.Topic: partitions, |
| 85 | }, |
| 86 | }) |
| 87 | |
| 88 | if err != nil { |
| 89 | return nil, fmt.Errorf("failed to get offsets: %w", err) |
| 90 | } |
| 91 | |
| 92 | topicOffsets := offsets.Topics[topic.Name] |
| 93 | partitionOffsets := make(map[int]int64, len(topicOffsets)) |
| 94 | |
| 95 | for _, off := range topicOffsets { |
| 96 | partitionOffsets[off.Partition] = off.CommittedOffset |
| 97 | } |
| 98 | |
| 99 | return partitionOffsets, nil |
| 100 | } |
| 101 | |
| 102 | func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) { |
| 103 | if c.Timeout > 0 { |