(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage)
| 605 | } |
| 606 | |
| 607 | func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) { |
| 608 | // Consume all produced messages with all client versions supported by the |
| 609 | // cluster. |
| 610 | g := errgroup.Group{} |
| 611 | for _, consVer := range clientVersions { |
| 612 | // Create a partition consumer that should start from the first produced |
| 613 | // message. |
| 614 | consCfg := NewFunctionalTestConfig() |
| 615 | consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String() |
| 616 | consCfg.Consumer.MaxProcessingTime = time.Second |
| 617 | consCfg.Metadata.Full = false |
| 618 | consCfg.Version = consVer |
| 619 | c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) |
| 620 | if err != nil { |
| 621 | t.Fatal(err) |
| 622 | } |
| 623 | defer safeClose(t, c) //nolint: gocritic // the close intentionally happens outside the loop |
| 624 | pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset) |
| 625 | if err != nil { |
| 626 | t.Fatal(err) |
| 627 | } |
| 628 | defer safeClose(t, pc) //nolint: gocritic // the close intentionally happens outside the loop |
| 629 | |
| 630 | var wg sync.WaitGroup |
| 631 | wg.Add(1) |
| 632 | g.Go(func() error { |
| 633 | // Consume as many messages as there have been produced and make sure that |
| 634 | // order is preserved. |
| 635 | t.Logf("*** Consuming with client version %s\n", consVer) |
| 636 | for i, prodMsg := range producedMessages { |
| 637 | select { |
| 638 | case consMsg := <-pc.Messages(): |
| 639 | if consMsg.Offset != prodMsg.Offset { |
| 640 | t.Fatalf("Consumed unexpected offset: version=%s, index=%d, want=%s, got=%s", |
| 641 | consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) |
| 642 | } |
| 643 | if string(consMsg.Value) != string(prodMsg.Value.(StringEncoder)) { |
| 644 | t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s", |
| 645 | consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg)) |
| 646 | } |
| 647 | if i == 0 { |
| 648 | t.Logf("Consumed first msg: version=%s, index=%d, got=%s", |
| 649 | consVer, i, consMsg2Str(consMsg)) |
| 650 | wg.Done() |
| 651 | } |
| 652 | if i%1000 == 0 { |
| 653 | t.Logf("Consumed messages: version=%s, index=%d, got=%s", |
| 654 | consVer, i, consMsg2Str(consMsg)) |
| 655 | } |
| 656 | case <-time.After(15 * time.Second): |
| 657 | t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s", |
| 658 | consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value) |
| 659 | } |
| 660 | } |
| 661 | return nil |
| 662 | }) |
| 663 | wg.Wait() // wait for first message to be consumed before starting next consumer |
| 664 | } |
no test coverage detected