MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/interceptors/main.go:25–83  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

23)
24
25func main() {
26 flag.Parse()
27
28 if *brokers == "" {
29 logger.Fatalln("at least one broker is required")
30 }
31 splitBrokers := strings.Split(*brokers, ",")
32 sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
33
34 version, err := sarama.ParseKafkaVersion(*version)
35 if err != nil {
36 log.Panicf("Error parsing Kafka version: %v", err)
37 }
38
39 // oTel stdout example
40 pusher, err := stdout.New()
41 if err != nil {
42 logger.Fatalf("failed to initialize stdout export pipeline: %v", err)
43 }
44 defer pusher.Shutdown(context.Background())
45
46 // simple sarama producer that adds a new producer interceptor
47 conf := sarama.NewConfig()
48 conf.Version = version
49 conf.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor(splitBrokers)}
50
51 producer, err := sarama.NewAsyncProducer(splitBrokers, conf)
52 if err != nil {
53 panic("Couldn't create a Kafka producer")
54 }
55 defer producer.AsyncClose()
56
57 // kill -2, trap SIGINT to trigger a shutdown
58 signals := make(chan os.Signal, 1)
59 signal.Notify(signals, os.Interrupt)
60
61 // ticker
62 bulkSize := 2
63 duration := 5 * time.Second
64 ticker := time.NewTicker(duration)
65 logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
66 for {
67 select {
68 case t := <-ticker.C:
69 now := t.Format(time.RFC3339)
70 logger.Printf("\nproducing %v messages to topic %s at %s", bulkSize, *topic, now)
71 for i := 0; i < bulkSize; i++ {
72 producer.Input() <- &sarama.ProducerMessage{
73 Topic: *topic, Key: nil,
74 Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
75 }
76 }
77 case <-signals:
78 logger.Println("terminating the program")
79 logger.Println("Bye :)")
80 return
81 }
82 }

Callers

nothing calls this directly

Calls 10

AsyncCloseMethod · 0.95
InputMethod · 0.95
ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
NewAsyncProducerFunction · 0.92
StringEncoderTypeAlias · 0.92
NewOTelInterceptorFunction · 0.85
FatalfMethod · 0.80
PrintfMethod · 0.65
PrintlnMethod · 0.65

Tested by

no test coverage detected