MCPcopy
hub / github.com/IBM/sarama / consumeMsgs

Function consumeMsgs

functional_consumer_test.go:607–668  ·  view source on GitHub ↗
(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage)

Source from the content-addressed store, hash-verified

605}
606
607func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
608 // Consume all produced messages with all client versions supported by the
609 // cluster.
610 g := errgroup.Group{}
611 for _, consVer := range clientVersions {
612 // Create a partition consumer that should start from the first produced
613 // message.
614 consCfg := NewFunctionalTestConfig()
615 consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String()
616 consCfg.Consumer.MaxProcessingTime = time.Second
617 consCfg.Metadata.Full = false
618 consCfg.Version = consVer
619 c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
620 if err != nil {
621 t.Fatal(err)
622 }
623 defer safeClose(t, c) //nolint: gocritic // the close intentionally happens outside the loop
624 pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
625 if err != nil {
626 t.Fatal(err)
627 }
628 defer safeClose(t, pc) //nolint: gocritic // the close intentionally happens outside the loop
629
630 var wg sync.WaitGroup
631 wg.Add(1)
632 g.Go(func() error {
633 // Consume as many messages as there have been produced and make sure that
634 // order is preserved.
635 t.Logf("*** Consuming with client version %s\n", consVer)
636 for i, prodMsg := range producedMessages {
637 select {
638 case consMsg := <-pc.Messages():
639 if consMsg.Offset != prodMsg.Offset {
640 t.Fatalf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s",
641 consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
642 }
643 if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) {
644 t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
645 consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
646 }
647 if i == 0 {
648 t.Logf("Consumed first msg: version=%s, index=%d, got=%s",
649 consVer, i, consMsg2Str(consMsg))
650 wg.Done()
651 }
652 if i%1000 == 0 {
653 t.Logf("Consumed messages: version=%s, index=%d, got=%s",
654 consVer, i, consMsg2Str(consMsg))
655 }
656 case <-time.After(15 * time.Second):
657 t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s",
658 consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value)
659 }
660 }
661 return nil
662 })
663 wg.Wait() // wait for first message to be consumed before starting next consumer
664 }

Callers 4

TestVersionMatrixFunction · 0.85
TestVersionMatrixLZ4Function · 0.85
TestVersionMatrixZstdFunction · 0.85

Calls 13

ConsumePartitionMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
prodMsg2StrFunction · 0.85
consMsg2StrFunction · 0.85
FatalMethod · 0.80
FatalfMethod · 0.80
NewConsumerFunction · 0.70
safeCloseFunction · 0.70
NameMethod · 0.65
MessagesMethod · 0.65
DoneMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected