MCPcopy
hub / github.com/segmentio/kafka-go / Fetch

Method Fetch

fetch.go:88–184  ·  view source on GitHub ↗

Fetch sends a fetch request to a kafka broker and returns the response. If the broker returned an invalid response with no topics, an error wrapping protocol.ErrNoTopic is returned. If the broker returned an invalid response with no partitions, an error wrapping ErrNoPartitions is returned.

(ctx context.Context, req *FetchRequest)

Source from the content-addressed store, hash-verified

86// If the broker returned an invalid response with no partitions, an error
87// wrapping ErrNoPartitions is returned.
88func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error) {
89 timeout := c.timeout(ctx, math.MaxInt64)
90 maxWait := req.maxWait()
91
92 if maxWait < timeout {
93 timeout = maxWait
94 }
95
96 offset := req.Offset
97 switch offset {
98 case FirstOffset, LastOffset:
99 topic, partition := req.Topic, req.Partition
100
101 r, err := c.ListOffsets(ctx, &ListOffsetsRequest{
102 Addr: req.Addr,
103 Topics: map[string][]OffsetRequest{
104 topic: {{
105 Partition: partition,
106 Timestamp: offset,
107 }},
108 },
109 })
110 if err != nil {
111 return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", err)
112 }
113
114 for _, p := range r.Topics[topic] {
115 if p.Partition == partition {
116 if p.Error != nil {
117 return nil, fmt.Errorf("kafka.(*Client).Fetch: %w", p.Error)
118 }
119 switch offset {
120 case FirstOffset:
121 offset = p.FirstOffset
122 case LastOffset:
123 offset = p.LastOffset
124 }
125 break
126 }
127 }
128 }
129
130 m, err := c.roundTrip(ctx, req.Addr, &fetchAPI.Request{
131 ReplicaID: -1,
132 MaxWaitTime: milliseconds(timeout),
133 MinBytes: int32(req.MinBytes),
134 MaxBytes: int32(req.MaxBytes),
135 IsolationLevel: int8(req.IsolationLevel),
136 SessionID: -1,
137 SessionEpoch: -1,
138 Topics: []fetchAPI.RequestTopic{{
139 Topic: req.Topic,
140 Partitions: []fetchAPI.RequestPartition{{
141 Partition: int32(req.Partition),
142 CurrentLeaderEpoch: -1,
143 FetchOffset: offset,
144 LogStartOffset: -1,
145 PartitionMaxBytes: int32(req.MaxBytes),

Callers 7

TestClientFetchFunction · 0.80
TestClientPipelineFunction · 0.80
clientCreateTopicFunction · 0.80
clientCreateTopicFunction · 0.80
newLocalClientAndTopicFunction · 0.80

Calls 8

timeoutMethod · 0.95
ListOffsetsMethod · 0.95
roundTripMethod · 0.95
millisecondsFunction · 0.85
makeDurationFunction · 0.85
makeErrorFunction · 0.85
maxWaitMethod · 0.80
NewRecordReaderFunction · 0.70

Tested by 7

TestClientFetchFunction · 0.64
TestClientPipelineFunction · 0.64
clientCreateTopicFunction · 0.64
clientCreateTopicFunction · 0.64
newLocalClientAndTopicFunction · 0.64