(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool)
| 527 | } |
| 528 | |
| 529 | func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { |
| 530 | var ( |
| 531 | producers []SyncProducer |
| 532 | producedMessagesMu sync.Mutex |
| 533 | producedMessages []*ProducerMessage |
| 534 | ) |
| 535 | g := errgroup.Group{} |
| 536 | for _, prodVer := range clientVersions { |
| 537 | for _, codec := range codecs { |
| 538 | prodCfg := NewFunctionalTestConfig() |
| 539 | prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String() |
| 540 | if idempotent { |
| 541 | prodCfg.ClientID += "-idempotent" |
| 542 | } |
| 543 | if codec > 0 { |
| 544 | prodCfg.ClientID += "-" + codec.String() |
| 545 | } |
| 546 | prodCfg.Metadata.Full = false |
| 547 | prodCfg.Version = prodVer |
| 548 | prodCfg.Producer.Return.Successes = true |
| 549 | prodCfg.Producer.Return.Errors = true |
| 550 | prodCfg.Producer.Flush.MaxMessages = flush |
| 551 | prodCfg.Producer.Compression = codec |
| 552 | prodCfg.Producer.Idempotent = idempotent |
| 553 | if idempotent { |
| 554 | prodCfg.Producer.RequiredAcks = WaitForAll |
| 555 | prodCfg.Net.MaxOpenRequests = 1 |
| 556 | } |
| 557 | |
| 558 | p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) |
| 559 | if err != nil { |
| 560 | t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) |
| 561 | } |
| 562 | producers = append(producers, p) |
| 563 | |
| 564 | g.Go(func() error { |
| 565 | t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec) |
| 566 | var wg sync.WaitGroup |
| 567 | for i := 0; i < countPerVerCodec; i++ { |
| 568 | msg := &ProducerMessage{ |
| 569 | Topic: "test.1", |
| 570 | Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)), |
| 571 | } |
| 572 | wg.Add(1) |
| 573 | go func() { |
| 574 | defer wg.Done() |
| 575 | _, _, err := p.SendMessage(msg) |
| 576 | if err != nil { |
| 577 | t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err) |
| 578 | } |
| 579 | producedMessagesMu.Lock() |
| 580 | producedMessages = append(producedMessages, msg) |
| 581 | producedMessagesMu.Unlock() |
| 582 | }() |
| 583 | } |
| 584 | wg.Wait() |
| 585 | return nil |
| 586 | }) |
no test coverage detected