| 113 | } |
| 114 | |
| 115 | func produceTestRecord(producerProvider *producerProvider) { |
| 116 | producer := producerProvider.borrow() |
| 117 | defer producerProvider.release(producer) |
| 118 | |
| 119 | // Start kafka transaction |
| 120 | err := producer.BeginTxn() |
| 121 | if err != nil { |
| 122 | log.Printf("unable to start txn %s\n", err) |
| 123 | return |
| 124 | } |
| 125 | |
| 126 | // Produce some records in transaction |
| 127 | var i int64 |
| 128 | for i = 0; i < recordsNumber; i++ { |
| 129 | producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder("test")} |
| 130 | } |
| 131 | |
| 132 | // commit transaction |
| 133 | err = producer.CommitTxn() |
| 134 | if err != nil { |
| 135 | log.Printf("Producer: unable to commit txn %s\n", err) |
| 136 | for { |
| 137 | if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 { |
| 138 | // fatal error. need to recreate producer. |
| 139 | log.Printf("Producer: producer is in a fatal state, need to recreate it") |
| 140 | break |
| 141 | } |
| 142 | // If producer is in abortable state, try to abort current transaction. |
| 143 | if producer.TxnStatus()&sarama.ProducerTxnFlagAbortableError != 0 { |
| 144 | err = producer.AbortTxn() |
| 145 | if err != nil { |
| 146 | // If an error occured just retry it. |
| 147 | log.Printf("Producer: unable to abort transaction: %+v", err) |
| 148 | continue |
| 149 | } |
| 150 | break |
| 151 | } |
| 152 | // if not you can retry |
| 153 | err = producer.CommitTxn() |
| 154 | if err != nil { |
| 155 | log.Printf("Producer: unable to commit txn %s\n", err) |
| 156 | continue |
| 157 | } |
| 158 | } |
| 159 | return |
| 160 | } |
| 161 | recordsRate.Mark(recordsNumber) |
| 162 | } |
| 163 | |
| 164 | // pool of producers that ensure transactional-id is unique. |
| 165 | type producerProvider struct { |