Fetch sends a fetch request to a kafka broker and returns the response. If the broker returned an invalid response with no topics, an error wrapping protocol.ErrNoTopic is returned. If the broker returned an invalid response with no partitions, an error wrapping ErrNoPartitions is returned.
(ctx context.Context, req *FetchRequest)
| 86 | // If the broker returned an invalid response with no partitions, an error |
| 87 | // wrapping ErrNoPartitions is returned. |
| 88 | func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) { |
| 89 | timeout := c.timeout(ctx, math.MaxInt64) |
| 90 | maxWait := req.maxWait() |
| 91 | |
| 92 | if maxWait < timeout { |
| 93 | timeout = maxWait |
| 94 | } |
| 95 | |
| 96 | offset := req.Offset |
| 97 | switch offset { |
| 98 | case FirstOffset, LastOffset: |
| 99 | topic, partition := req.Topic, req.Partition |
| 100 | |
| 101 | r, err := c.ListOffsets(ctx, &ListOffsetsRequest{ |
| 102 | Addr: req.Addr, |
| 103 | Topics: map[string][]OffsetRequest{ |
| 104 | topic: {{ |
| 105 | Partition: partition, |
| 106 | Timestamp: offset, |
| 107 | }}, |
| 108 | }, |
| 109 | }) |
| 110 | if err != nil { |
| 111 | return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err) |
| 112 | } |
| 113 | |
| 114 | for _, p := range r.Topics[topic] { |
| 115 | if p.Partition == partition { |
| 116 | if p.Error != nil { |
| 117 | return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", p.Error) |
| 118 | } |
| 119 | switch offset { |
| 120 | case FirstOffset: |
| 121 | offset = p.FirstOffset |
| 122 | case LastOffset: |
| 123 | offset = p.LastOffset |
| 124 | } |
| 125 | break |
| 126 | } |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{ |
| 131 | ReplicaID: -1, |
| 132 | MaxWaitTime: milliseconds(timeout), |
| 133 | MinBytes: int32(req.MinBytes), |
| 134 | MaxBytes: int32(req.MaxBytes), |
| 135 | IsolationLevel: int8(req.IsolationLevel), |
| 136 | SessionID: -1, |
| 137 | SessionEpoch: -1, |
| 138 | Topics: []fetchAPI.RequestTopic{{ |
| 139 | Topic: req.Topic, |
| 140 | Partitions: []fetchAPI.RequestPartition{{ |
| 141 | Partition: int32(req.Partition), |
| 142 | CurrentLeaderEpoch: -1, |
| 143 | FetchOffset: offset, |
| 144 | LogStartOffset: -1, |
| 145 | PartitionMaxBytes: int32(req.MaxBytes), |