MCPcopy
hub / github.com/segmentio/kafka-go / ConsumerOffsets

Method ConsumerOffsets

client.go:65–100  ·  view source on GitHub ↗

ConsumerOffsets returns a map[int]int64 of partition to committed offset for a consumer group id and topic. DEPRECATED: this method will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.

(ctx context.Context, tg TopicAndGroup)

Source from the content-addressed store, hash-verified

63// DEPRECATED: this method will be removed in version 1.0, programs should
64// migrate to use kafka.(*Client).OffsetFetch instead.
65func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
66 metadata, err := c.Metadata(ctx, &MetadataRequest{
67 Topics: []string{tg.Topic},
68 })
69
70 if err != nil {
71 return nil, fmt.Errorf("failed to get topic metadata :%w", err)
72 }
73
74 topic := metadata.Topics[0]
75 partitions := make([]int, len(topic.Partitions))
76
77 for i := range topic.Partitions {
78 partitions[i] = topic.Partitions[i].ID
79 }
80
81 offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
82 GroupID: tg.GroupId,
83 Topics: map[string][]int{
84 tg.Topic: partitions,
85 },
86 })
87
88 if err != nil {
89 return nil, fmt.Errorf("failed to get offsets: %w", err)
90 }
91
92 topicOffsets := offsets.Topics[topic.Name]
93 partitionOffsets := make(map[int]int64, len(topicOffsets))
94
95 for _, off := range topicOffsets {
96 partitionOffsets[off.Partition] = off.CommittedOffset
97 }
98
99 return partitionOffsets, nil
100}
101
102func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
103 if c.Timeout > 0 {

Callers 1

Calls 2

MetadataMethod · 0.95
OffsetFetchMethod · 0.95

Tested by 1