(t *testing.T)
| 667 | } |
| 668 | |
| 669 | func testWriterMultipleTopics(t *testing.T) { |
| 670 | topic1 := makeTopic() |
| 671 | createTopic(t, topic1, 1) |
| 672 | defer deleteTopic(t, topic1) |
| 673 | |
| 674 | offset1, err := readOffset(topic1, 0) |
| 675 | if err != nil { |
| 676 | t.Fatal(err) |
| 677 | } |
| 678 | |
| 679 | topic2 := makeTopic() |
| 680 | createTopic(t, topic2, 1) |
| 681 | defer deleteTopic(t, topic2) |
| 682 | |
| 683 | offset2, err := readOffset(topic2, 0) |
| 684 | if err != nil { |
| 685 | t.Fatal(err) |
| 686 | } |
| 687 | |
| 688 | w := newTestWriter(WriterConfig{ |
| 689 | Balancer: &RoundRobin{}, |
| 690 | }) |
| 691 | defer w.Close() |
| 692 | |
| 693 | msg1 := Message{Topic: topic1, Value: []byte("Hello")} |
| 694 | msg2 := Message{Topic: topic2, Value: []byte("World")} |
| 695 | |
| 696 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 697 | defer cancel() |
| 698 | if err := w.WriteMessages(ctx, msg1, msg2); err != nil { |
| 699 | t.Error(err) |
| 700 | return |
| 701 | } |
| 702 | ws := w.Stats() |
| 703 | if ws.Writes != 2 { |
| 704 | t.Error("didn't batch messages; Writes: ", ws.Writes) |
| 705 | return |
| 706 | } |
| 707 | |
| 708 | msgs1, err := readPartition(topic1, 0, offset1) |
| 709 | if err != nil { |
| 710 | t.Error("error reading partition", err) |
| 711 | return |
| 712 | } |
| 713 | if len(msgs1) != 1 { |
| 714 | t.Error("bad messages in partition", msgs1) |
| 715 | return |
| 716 | } |
| 717 | if string(msgs1[0].Value) != "Hello" { |
| 718 | t.Error("bad message in partition", msgs1) |
| 719 | } |
| 720 | |
| 721 | msgs2, err := readPartition(topic2, 0, offset2) |
| 722 | if err != nil { |
| 723 | t.Error("error reading partition", err) |
| 724 | return |
| 725 | } |
| 726 | if len(msgs2) != 1 { |
nothing calls this directly
no test coverage detected