OffsetFetch sends an offset fetch request to a kafka broker and returns the response.
(ctx context.Context, req *OffsetFetchRequest)
| 66 | // OffsetFetch sends an offset fetch request to a kafka broker and returns the |
| 67 | // response. |
| 68 | func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) { |
| 69 | |
| 70 | // Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API |
| 71 | // which will return the result for all topics with the desired consumer group: |
| 72 | // https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch |
| 73 | // For Kafka version below 0.10.2.x this call will result in an error |
| 74 | var topics []offsetfetch.RequestTopic |
| 75 | |
| 76 | if len(req.Topics) > 0 { |
| 77 | topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics)) |
| 78 | |
| 79 | for topicName, partitions := range req.Topics { |
| 80 | indexes := make([]int32, len(partitions)) |
| 81 | |
| 82 | for i, p := range partitions { |
| 83 | indexes[i] = int32(p) |
| 84 | } |
| 85 | |
| 86 | topics = append(topics, offsetfetch.RequestTopic{ |
| 87 | Name: topicName, |
| 88 | PartitionIndexes: indexes, |
| 89 | }) |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{ |
| 94 | GroupID: req.GroupID, |
| 95 | Topics: topics, |
| 96 | }) |
| 97 | |
| 98 | if err != nil { |
| 99 | return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err) |
| 100 | } |
| 101 | |
| 102 | res := m.(*offsetfetch.Response) |
| 103 | ret := &OffsetFetchResponse{ |
| 104 | Throttle: makeDuration(res.ThrottleTimeMs), |
| 105 | Topics: make(map[string][]OffsetFetchPartition, len(res.Topics)), |
| 106 | Error: makeError(res.ErrorCode, ""), |
| 107 | } |
| 108 | |
| 109 | for _, t := range res.Topics { |
| 110 | partitions := make([]OffsetFetchPartition, len(t.Partitions)) |
| 111 | |
| 112 | for i, p := range t.Partitions { |
| 113 | partitions[i] = OffsetFetchPartition{ |
| 114 | Partition: int(p.PartitionIndex), |
| 115 | CommittedOffset: p.CommittedOffset, |
| 116 | Metadata: p.Metadata, |
| 117 | Error: makeError(p.ErrorCode, ""), |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | ret.Topics[t.Name] = partitions |
| 122 | } |
| 123 | |
| 124 | return ret, nil |
| 125 | } |