MCPcopy
hub / github.com/IBM/sarama / produceTestRecord

Function produceTestRecord

examples/txn_producer/main.go:115–162  ·  view source on GitHub ↗
(producerProvider *producerProvider)

Source from the content-addressed store, hash-verified

113}
114
115func produceTestRecord(producerProvider *producerProvider) {
116 producer := producerProvider.borrow()
117 defer producerProvider.release(producer)
118
119 // Start kafka transaction
120 err := producer.BeginTxn()
121 if err != nil {
122 log.Printf("unable to start txn %s\n", err)
123 return
124 }
125
126 // Produce some records in transaction
127 var i int64
128 for i = 0; i < recordsNumber; i++ {
129 producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder("test")}
130 }
131
132 // commit transaction
133 err = producer.CommitTxn()
134 if err != nil {
135 log.Printf("Producer: unable to commit txn %s\n", err)
136 for {
137 if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 {
138 // fatal error. need to recreate producer.
139 log.Printf("Producer: producer is in a fatal state, need to recreate it")
140 break
141 }
142 // If producer is in abortable state, try to abort current transaction.
143 if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
144 err = producer.AbortTxn()
145 if err != nil {
146 // If an error occured just retry it.
147 log.Printf("Producer: unable to abort transaction: %+v", err)
148 continue
149 }
150 break
151 }
152 // if not you can retry
153 err = producer.CommitTxn()
154 if err != nil {
155 log.Printf("Producer: unable to commit txn %s\n", err)
156 continue
157 }
158 }
159 return
160 }
161 recordsRate.Mark(recordsNumber)
162}
163
164// pool of producers that ensure transactional-id is unique.
165type producerProvider struct {

Callers 1

mainFunction · 0.85

Calls 9

StringEncoderTypeAlias · 0.92
BeginTxnMethod · 0.65
PrintfMethod · 0.65
InputMethod · 0.65
CommitTxnMethod · 0.65
TxnStatusMethod · 0.65
AbortTxnMethod · 0.65
borrowMethod · 0.45
releaseMethod · 0.45

Tested by

no test coverage detected