MCPcopy
hub / github.com/segmentio/kafka-go / readRecords

Function readRecords

fetch_test.go:170–207  ·  view source on GitHub ↗
(records RecordReader)

Source from the content-addressed store, hash-verified

168}
169
170func readRecords(records RecordReader) ([]memoryRecord, error) {
171 list := []memoryRecord{}
172
173 for {
174 rec, err := records.ReadRecord()
175
176 if err != nil {
177 if errors.Is(err, io.EOF) {
178 return list, nil
179 }
180 return nil, err
181 }
182
183 var (
184 offset = rec.Offset
185 key = rec.Key
186 value = rec.Value
187 headers = rec.Headers
188 bytesKey []byte
189 bytesValues []byte
190 )
191
192 if key != nil {
193 bytesKey, _ = ioutil.ReadAll(key)
194 }
195
196 if value != nil {
197 bytesValues, _ = ioutil.ReadAll(value)
198 }
199
200 list = append(list, memoryRecord{
201 offset: offset,
202 key: bytesKey,
203 value: bytesValues,
204 headers: headers,
205 })
206 }
207}
208
209func TestClientPipeline(t *testing.T) {
210 client, topic, shutdown := newLocalClientAndTopic()

Callers 1

assertFetchResponseFunction · 0.85

Calls 1

ReadRecordMethod · 0.65

Tested by

no test coverage detected