(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error)
| 278 | } |
| 279 | |
| 280 | func (consumer *Consumer) handleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error) { |
| 281 | log.Printf("Message consumer: unable to process transaction: %+v", err) |
| 282 | for { |
| 283 | if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 { |
| 284 | // fatal error. need to recreate producer. |
| 285 | log.Printf("Message consumer: producer is in a fatal state, need to recreate it") |
| 286 | // reset current consumer offset to retry consume this record. |
| 287 | session.ResetOffset(message.Topic, message.Partition, message.Offset, "") |
| 288 | return |
| 289 | } |
| 290 | if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 { |
| 291 | err = producer.AbortTxn() |
| 292 | if err != nil { |
| 293 | log.Printf("Message consumer: unable to abort transaction: %+v", err) |
| 294 | continue |
| 295 | } |
| 296 | // reset current consumer offset to retry consume this record. |
| 297 | session.ResetOffset(message.Topic, message.Partition, message.Offset, "") |
| 298 | return |
| 299 | } |
| 300 | // if not you can retry |
| 301 | err = defaulthandler() |
| 302 | if err == nil { |
| 303 | return |
| 304 | } |
| 305 | } |
| 306 | } |
| 307 | |
| 308 | type topicPartition struct { |
| 309 | topic string |
no test coverage detected