MCPcopy
hub / github.com/segmentio/kafka-go / testWriterBatchBytesHeaders

Function testWriterBatchBytesHeaders

writer_test.go:608–667  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

606}
607
608func testWriterBatchBytesHeaders(t *testing.T) {
609 topic := makeTopic()
610 createTopic(t, topic, 1)
611 defer deleteTopic(t, topic)
612
613 offset, err := readOffset(topic, 0)
614 if err != nil {
615 t.Fatal(err)
616 }
617
618 w := newTestWriter(WriterConfig{
619 Topic: topic,
620 BatchBytes: 100,
621 BatchTimeout: 50 * time.Millisecond,
622 Balancer: &RoundRobin{},
623 })
624 defer w.Close()
625
626 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
627 defer cancel()
628 if err := w.WriteMessages(ctx, []Message{
629 {
630 Value: []byte("Hello World 1"),
631 Headers: []Header{
632 {Key: "User-Agent", Value: []byte("abc/xyz")},
633 },
634 },
635 {
636 Value: []byte("Hello World 2"),
637 Headers: []Header{
638 {Key: "User-Agent", Value: []byte("abc/xyz")},
639 },
640 },
641 }...); err != nil {
642 t.Error(err)
643 return
644 }
645 ws := w.Stats()
646 if ws.Writes != 2 {
647 t.Error("didn't batch messages; Writes: ", ws.Writes)
648 return
649 }
650 msgs, err := readPartition(topic, 0, offset)
651 if err != nil {
652 t.Error("error reading partition", err)
653 return
654 }
655
656 if len(msgs) != 2 {
657 t.Error("bad messages in partition", msgs)
658 return
659 }
660
661 for _, m := range msgs {
662 if strings.HasPrefix(string(m.Value), "Hello World") {
663 continue
664 }
665 t.Error("bad messages in partition", msgs)

Callers

nothing calls this directly

Calls 10

createTopicFunction · 0.85
deleteTopicFunction · 0.85
readOffsetFunction · 0.85
newTestWriterFunction · 0.85
readPartitionFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45
StatsMethod · 0.45

Tested by

no test coverage detected