MCPcopy
hub / github.com/IBM/sarama / handleTxnError

Method handleTxnError

examples/exactly_once/main.go:280–306  ·  view source on GitHub ↗
(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error)

Source from the content-addressed store, hash-verified

278}
279
280func (consumer *Consumer) handleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error) {
281 log.Printf("Message consumer: unable to process transaction: %+v", err)
282 for {
283 if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 {
284 // fatal error. need to recreate producer.
285 log.Printf("Message consumer: producer is in a fatal state, need to recreate it")
286 // reset current consumer offset to retry consume this record.
287 session.ResetOffset(message.Topic, message.Partition, message.Offset, "")
288 return
289 }
290 if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 {
291 err = producer.AbortTxn()
292 if err != nil {
293 log.Printf("Message consumer: unable to abort transaction: %+v", err)
294 continue
295 }
296 // reset current consumer offset to retry consume this record.
297 session.ResetOffset(message.Topic, message.Partition, message.Offset, "")
298 return
299 }
300 // if not you can retry
301 err = defaulthandler()
302 if err == nil {
303 return
304 }
305 }
306}
307
308type topicPartition struct {
309 topic string

Callers 1

ConsumeClaimMethod · 0.95

Implementers 2

consumerconsumer.go
Consumermocks/consumer.go

Calls 4

PrintfMethod · 0.65
TxnStatusMethod · 0.65
ResetOffsetMethod · 0.65
AbortTxnMethod · 0.65

Tested by

no test coverage detected