MCPcopy
hub / github.com/segmentio/kafka-go / testWriterMultipleTopics

Function testWriterMultipleTopics

writer_test.go:669–733  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

667}
668
669func testWriterMultipleTopics(t *testing.T) {
670 topic1 := makeTopic()
671 createTopic(t, topic1, 1)
672 defer deleteTopic(t, topic1)
673
674 offset1, err := readOffset(topic1, 0)
675 if err != nil {
676 t.Fatal(err)
677 }
678
679 topic2 := makeTopic()
680 createTopic(t, topic2, 1)
681 defer deleteTopic(t, topic2)
682
683 offset2, err := readOffset(topic2, 0)
684 if err != nil {
685 t.Fatal(err)
686 }
687
688 w := newTestWriter(WriterConfig{
689 Balancer: &RoundRobin{},
690 })
691 defer w.Close()
692
693 msg1 := Message{Topic: topic1, Value: []byte("Hello")}
694 msg2 := Message{Topic: topic2, Value: []byte("World")}
695
696 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
697 defer cancel()
698 if err := w.WriteMessages(ctx, msg1, msg2); err != nil {
699 t.Error(err)
700 return
701 }
702 ws := w.Stats()
703 if ws.Writes != 2 {
704 t.Error("didn't batch messages; Writes: ", ws.Writes)
705 return
706 }
707
708 msgs1, err := readPartition(topic1, 0, offset1)
709 if err != nil {
710 t.Error("error reading partition", err)
711 return
712 }
713 if len(msgs1) != 1 {
714 t.Error("bad messages in partition", msgs1)
715 return
716 }
717 if string(msgs1[0].Value) != "Hello" {
718 t.Error("bad message in partition", msgs1)
719 }
720
721 msgs2, err := readPartition(topic2, 0, offset2)
722 if err != nil {
723 t.Error("error reading partition", err)
724 return
725 }
726 if len(msgs2) != 1 {

Callers

nothing calls this directly

Calls 10

createTopicFunction · 0.85
deleteTopicFunction · 0.85
readOffsetFunction · 0.85
newTestWriterFunction · 0.85
readPartitionFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45
StatsMethod · 0.45

Tested by

no test coverage detected