MCPcopy
hub / github.com/IBM/sarama / produceMsgs

Function produceMsgs

functional_consumer_test.go:529–605  ·  functional_consumer_test.go::produceMsgs
(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool)

Source from the content-addressed store, hash-verified

527}
528
529func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
530 var (
531 producers []SyncProducer
532 producedMessagesMu sync.Mutex
533 producedMessages []*ProducerMessage
534 )
535 g := errgroup.Group{}
536 for _, prodVer := range clientVersions {
537 for _, codec := range codecs {
538 prodCfg := NewFunctionalTestConfig()
539 prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String()
540 if idempotent {
541 prodCfg.ClientID += "-idempotent"
542 }
543 if codec > 0 {
544 prodCfg.ClientID += "-" + codec.String()
545 }
546 prodCfg.Metadata.Full = false
547 prodCfg.Version = prodVer
548 prodCfg.Producer.Return.Successes = true
549 prodCfg.Producer.Return.Errors = true
550 prodCfg.Producer.Flush.MaxMessages = flush
551 prodCfg.Producer.Compression = codec
552 prodCfg.Producer.Idempotent = idempotent
553 if idempotent {
554 prodCfg.Producer.RequiredAcks = WaitForAll
555 prodCfg.Net.MaxOpenRequests = 1
556 }
557
558 p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
559 if err != nil {
560 t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
561 }
562 producers = append(producers, p)
563
564 g.Go(func() error {
565 t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
566 var wg sync.WaitGroup
567 for i := 0; i < countPerVerCodec; i++ {
568 msg := &ProducerMessage{
569 Topic: "test.1",
570 Value: StringEncoder(fmt.Sprintf("msg:%s:%s:%d", prodVer, codec, i)),
571 }
572 wg.Add(1)
573 go func() {
574 defer wg.Done()
575 _, _, err := p.SendMessage(msg)
576 if err != nil {
577 t.Errorf("Failed to produce message: %s, err=%v", msg.Value, err)
578 }
579 producedMessagesMu.Lock()
580 producedMessages = append(producedMessages, msg)
581 producedMessagesMu.Unlock()
582 }()
583 }
584 wg.Wait()
585 return nil
586 })

Callers 4

TestVersionMatrixFunction · 0.85
TestVersionMatrixLZ4Function · 0.85
TestVersionMatrixZstdFunction · 0.85

Calls 12

SendMessageMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
FatalfMethod · 0.80
FatalMethod · 0.80
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70
NameMethod · 0.65
DoneMethod · 0.65
ErrorfMethod · 0.65
StringMethod · 0.45
AddMethod · 0.45

Tested by

no test coverage detected