MCPcopy
hub / github.com/segmentio/kafka-go / readPartition

Function readPartition

writer_test.go:426–451  ·  view source on GitHub ↗
(topic string, partition int, offset int64)

Source from the content-addressed store, hash-verified

424}
425
426func readPartition(topic string, partition int, offset int64) (msgs []Message, err error) {
427 var conn *Conn
428
429 if conn, err = DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition); err != nil {
430 return
431 }
432 defer conn.Close()
433
434 conn.Seek(offset, SeekAbsolute)
435 conn.SetReadDeadline(time.Now().Add(10 * time.Second))
436 batch := conn.ReadBatch(0, 1000000000)
437 defer batch.Close()
438
439 for {
440 var msg Message
441
442 if msg, err = batch.ReadMessage(); err != nil {
443 if errors.Is(err, io.EOF) {
444 err = nil
445 }
446 return
447 }
448
449 msgs = append(msgs, msg)
450 }
451}
452
453func testWriterBatchBytes(t *testing.T) {
454 topic := makeTopic()

Callers 6

testWriterRoundRobin1Function · 0.85
testWriterBatchBytesFunction · 0.85
testWriterBatchSizeFunction · 0.85
testWriterMultipleTopicsFunction · 0.85

Calls 6

CloseMethod · 0.95
SeekMethod · 0.95
SetReadDeadlineMethod · 0.95
ReadBatchMethod · 0.95
DialLeaderFunction · 0.85
ReadMessageMethod · 0.45

Tested by

no test coverage detected