(t *testing.T)
| 606 | } |
| 607 | |
| 608 | func testWriterBatchBytesHeaders(t *testing.T) { |
| 609 | topic := makeTopic() |
| 610 | createTopic(t, topic, 1) |
| 611 | defer deleteTopic(t, topic) |
| 612 | |
| 613 | offset, err := readOffset(topic, 0) |
| 614 | if err != nil { |
| 615 | t.Fatal(err) |
| 616 | } |
| 617 | |
| 618 | w := newTestWriter(WriterConfig{ |
| 619 | Topic: topic, |
| 620 | BatchBytes: 100, |
| 621 | BatchTimeout: 50 * time.Millisecond, |
| 622 | Balancer: &RoundRobin{}, |
| 623 | }) |
| 624 | defer w.Close() |
| 625 | |
| 626 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 627 | defer cancel() |
| 628 | if err := w.WriteMessages(ctx, []Message{ |
| 629 | { |
| 630 | Value: []byte("Hello World 1"), |
| 631 | Headers: []Header{ |
| 632 | {Key: "User-Agent", Value: []byte("abc/xyz")}, |
| 633 | }, |
| 634 | }, |
| 635 | { |
| 636 | Value: []byte("Hello World 2"), |
| 637 | Headers: []Header{ |
| 638 | {Key: "User-Agent", Value: []byte("abc/xyz")}, |
| 639 | }, |
| 640 | }, |
| 641 | }...); err != nil { |
| 642 | t.Error(err) |
| 643 | return |
| 644 | } |
| 645 | ws := w.Stats() |
| 646 | if ws.Writes != 2 { |
| 647 | t.Error("didn't batch messages; Writes: ", ws.Writes) |
| 648 | return |
| 649 | } |
| 650 | msgs, err := readPartition(topic, 0, offset) |
| 651 | if err != nil { |
| 652 | t.Error("error reading partition", err) |
| 653 | return |
| 654 | } |
| 655 | |
| 656 | if len(msgs) != 2 { |
| 657 | t.Error("bad messages in partition", msgs) |
| 658 | return |
| 659 | } |
| 660 | |
| 661 | for _, m := range msgs { |
| 662 | if strings.HasPrefix(string(m.Value), "Hello World") { |
| 663 | continue |
| 664 | } |
| 665 | t.Error("bad messages in partition", msgs) |
nothing calls this directly
no test coverage detected