(topic string, partition int, offset int64)
| 424 | } |
| 425 | |
| 426 | func readPartition(topic string, partition int, offset int64) (msgs []Message, err error) { |
| 427 | var conn *Conn |
| 428 | |
| 429 | if conn, err = DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition); err != nil { |
| 430 | return |
| 431 | } |
| 432 | defer conn.Close() |
| 433 | |
| 434 | conn.Seek(offset, SeekAbsolute) |
| 435 | conn.SetReadDeadline(time.Now().Add(10 * time.Second)) |
| 436 | batch := conn.ReadBatch(0, 1000000000) |
| 437 | defer batch.Close() |
| 438 | |
| 439 | for { |
| 440 | var msg Message |
| 441 | |
| 442 | if msg, err = batch.ReadMessage(); err != nil { |
| 443 | if errors.Is(err, io.EOF) { |
| 444 | err = nil |
| 445 | } |
| 446 | return |
| 447 | } |
| 448 | |
| 449 | msgs = append(msgs, msg) |
| 450 | } |
| 451 | } |
| 452 | |
| 453 | func testWriterBatchBytes(t *testing.T) { |
| 454 | topic := makeTopic() |
no test coverage detected