MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/txn_producer/main.go:54–113  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

52}
53
54func main() {
55 keepRunning := true
56 log.Println("Starting a new Sarama producer")
57
58 if verbose {
59 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
60 }
61
62 version, err := sarama.ParseKafkaVersion(version)
63 if err != nil {
64 log.Panicf("Error parsing Kafka version: %v", err)
65 }
66
67 producerProvider := newProducerProvider(strings.Split(brokers, ","), func() *sarama.Config {
68 config := sarama.NewConfig()
69 config.Version = version
70 config.Producer.Idempotent = true
71 config.Producer.Return.Errors = false
72 config.Producer.RequiredAcks = sarama.WaitForAll
73 config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
74 config.Producer.Transaction.Retry.Backoff = 10
75 config.Producer.Transaction.ID = "txn_producer"
76 config.Net.MaxOpenRequests = 1
77 return config
78 })
79
80 go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.LstdFlags))
81
82 ctx, cancel := context.WithCancel(context.Background())
83
84 var wg sync.WaitGroup
85 for i := 0; i < producers; i++ {
86 wg.Add(1)
87 go func() {
88 defer wg.Done()
89
90 for {
91 select {
92 case <-ctx.Done():
93 return
94 default:
95 produceTestRecord(producerProvider)
96 }
97 }
98 }()
99 }
100
101 sigterm := make(chan os.Signal, 1)
102 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
103
104 for keepRunning {
105 <-sigterm
106 log.Println("terminating: via signal")
107 keepRunning = false
108 }
109 cancel()
110 wg.Wait()
111

Callers

nothing calls this directly

Calls 8

ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
produceTestRecordFunction · 0.85
newProducerProviderFunction · 0.70
PrintlnMethod · 0.65
DoneMethod · 0.65
AddMethod · 0.45
clearMethod · 0.45

Tested by

no test coverage detected