This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.
()
| 2875 | // for the Successes channel to be populated, you have to set |
| 2876 | // config.Producer.Return.Successes to true. |
| 2877 | func ExampleAsyncProducer_goroutines() { |
| 2878 | config := NewTestConfig() |
| 2879 | config.Producer.Return.Successes = true |
| 2880 | producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) |
| 2881 | if err != nil { |
| 2882 | panic(err) |
| 2883 | } |
| 2884 | |
| 2885 | // Trap SIGINT to trigger a graceful shutdown. |
| 2886 | signals := make(chan os.Signal, 1) |
| 2887 | signal.Notify(signals, os.Interrupt) |
| 2888 | |
| 2889 | var ( |
| 2890 | wg sync.WaitGroup |
| 2891 | enqueued, successes, producerErrors int |
| 2892 | ) |
| 2893 | |
| 2894 | wg.Go(func() { |
| 2895 | for range producer.Successes() { |
| 2896 | successes++ |
| 2897 | } |
| 2898 | }) |
| 2899 | |
| 2900 | wg.Go(func() { |
| 2901 | for err := range producer.Errors() { |
| 2902 | log.Println(err) |
| 2903 | producerErrors++ |
| 2904 | } |
| 2905 | }) |
| 2906 | |
| 2907 | ProducerLoop: |
| 2908 | for { |
| 2909 | message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} |
| 2910 | select { |
| 2911 | case producer.Input() <- message: |
| 2912 | enqueued++ |
| 2913 | |
| 2914 | case <-signals: |
| 2915 | producer.AsyncClose() // Trigger a shutdown of the producer. |
| 2916 | break ProducerLoop |
| 2917 | } |
| 2918 | } |
| 2919 | |
| 2920 | wg.Wait() |
| 2921 | |
| 2922 | log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors) |
| 2923 | } |
| 2924 | |
| 2925 | // TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries, |
| 2926 | // both with and without request pipelining (MaxOpenRequests=1 vs >1). |
nothing calls this directly
no test coverage detected