(t *testing.T)
| 555 | } |
| 556 | |
| 557 | func testWriterSmallBatchBytes(t *testing.T) { |
| 558 | topic := makeTopic() |
| 559 | createTopic(t, topic, 1) |
| 560 | defer deleteTopic(t, topic) |
| 561 | |
| 562 | offset, err := readOffset(topic, 0) |
| 563 | if err != nil { |
| 564 | t.Fatal(err) |
| 565 | } |
| 566 | |
| 567 | w := newTestWriter(WriterConfig{ |
| 568 | Topic: topic, |
| 569 | BatchBytes: 25, |
| 570 | BatchTimeout: 50 * time.Millisecond, |
| 571 | Balancer: &RoundRobin{}, |
| 572 | }) |
| 573 | defer w.Close() |
| 574 | |
| 575 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 576 | defer cancel() |
| 577 | if err := w.WriteMessages(ctx, []Message{ |
| 578 | {Value: []byte("Hi")}, // 24 Bytes |
| 579 | {Value: []byte("By")}, // 24 Bytes |
| 580 | }...); err != nil { |
| 581 | t.Error(err) |
| 582 | return |
| 583 | } |
| 584 | ws := w.Stats() |
| 585 | if ws.Writes != 2 { |
| 586 | t.Error("didn't batch messages; Writes: ", ws.Writes) |
| 587 | return |
| 588 | } |
| 589 | msgs, err := readPartition(topic, 0, offset) |
| 590 | if err != nil { |
| 591 | t.Error("error reading partition", err) |
| 592 | return |
| 593 | } |
| 594 | |
| 595 | if len(msgs) != 2 { |
| 596 | t.Error("bad messages in partition", msgs) |
| 597 | return |
| 598 | } |
| 599 | |
| 600 | for _, m := range msgs { |
| 601 | if string(m.Value) == "Hi" || string(m.Value) == "By" { |
| 602 | continue |
| 603 | } |
| 604 | t.Error("bad messages in partition", msgs) |
| 605 | } |
| 606 | } |
| 607 | |
| 608 | func testWriterBatchBytesHeaders(t *testing.T) { |
| 609 | topic := makeTopic() |
nothing calls this directly
no test coverage detected