MCPcopy
hub / github.com/IBM/sarama / ExampleAsyncProducer_goroutines

Function ExampleAsyncProducer_goroutines

async_producer_test.go:2877–2923  ·  view source on GitHub ↗

This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

()

Source from the content-addressed store, hash-verified

2875// for the Successes channel to be populated, you have to set
2876// config.Producer.Return.Successes to true.
2877func ExampleAsyncProducer_goroutines() {
2878 config := NewTestConfig()
2879 config.Producer.Return.Successes = true
2880 producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
2881 if err != nil {
2882 panic(err)
2883 }
2884
2885 // Trap SIGINT to trigger a graceful shutdown.
2886 signals := make(chan os.Signal, 1)
2887 signal.Notify(signals, os.Interrupt)
2888
2889 var (
2890 wg sync.WaitGroup
2891 enqueued, successes, producerErrors int
2892 )
2893
2894 wg.Go(func() {
2895 for range producer.Successes() {
2896 successes++
2897 }
2898 })
2899
2900 wg.Go(func() {
2901 for err := range producer.Errors() {
2902 log.Println(err)
2903 producerErrors++
2904 }
2905 })
2906
2907ProducerLoop:
2908 for {
2909 message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
2910 select {
2911 case producer.Input() <- message:
2912 enqueued++
2913
2914 case <-signals:
2915 producer.AsyncClose() // Trigger a shutdown of the producer.
2916 break ProducerLoop
2917 }
2918 }
2919
2920 wg.Wait()
2921
2922 log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
2923}
2924
2925// TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries,
2926// both with and without request pipelining (MaxOpenRequests=1 vs >1).

Callers

nothing calls this directly

Calls 9

SuccessesMethod · 0.95
ErrorsMethod · 0.95
InputMethod · 0.95
AsyncCloseMethod · 0.95
StringEncoderTypeAlias · 0.85
NewTestConfigFunction · 0.70
NewAsyncProducerFunction · 0.70
PrintlnMethod · 0.65
PrintfMethod · 0.65

Tested by

no test coverage detected