(t *testing.T)
| 132 | } |
| 133 | |
| 134 | func TestProducerWithTxn(t *testing.T) { |
| 135 | config := NewTestConfig() |
| 136 | config.Producer.Transaction.ID = "test" |
| 137 | config.Producer.RequiredAcks = sarama.WaitForAll |
| 138 | config.Producer.Retry.Backoff = 0 |
| 139 | config.Producer.Idempotent = true |
| 140 | config.Net.MaxOpenRequests = 1 |
| 141 | config.Version = sarama.V0_11_0_0 |
| 142 | |
| 143 | trm := newTestReporterMock() |
| 144 | mp := NewAsyncProducer(trm, config). |
| 145 | ExpectInputAndSucceed() |
| 146 | |
| 147 | if !mp.IsTransactional() { |
| 148 | t.Error("producer must be transactional") |
| 149 | } |
| 150 | |
| 151 | if err := mp.BeginTxn(); err != nil { |
| 152 | t.Error(err) |
| 153 | } |
| 154 | |
| 155 | if mp.TxnStatus()&sarama.ProducerTxnFlagInTransaction == 0 { |
| 156 | t.Error("transaction must be started") |
| 157 | } |
| 158 | |
| 159 | mp.Input() <- &sarama.ProducerMessage{Topic: "test"} |
| 160 | |
| 161 | if err := mp.AddMessageToTxn(&sarama.ConsumerMessage{ |
| 162 | Topic: "original-topic", |
| 163 | Partition: 0, |
| 164 | Offset: 123, |
| 165 | }, "test-group", nil); err != nil { |
| 166 | t.Error(err) |
| 167 | } |
| 168 | |
| 169 | if err := mp.AddOffsetsToTxn(map[string][]*sarama.PartitionOffsetMetadata{ |
| 170 | "original-topic": { |
| 171 | { |
| 172 | Partition: 1, |
| 173 | Offset: 321, |
| 174 | }, |
| 175 | }, |
| 176 | }, "test-group"); err != nil { |
| 177 | t.Error(err) |
| 178 | } |
| 179 | |
| 180 | if err := mp.CommitTxn(); err != nil { |
| 181 | t.Error(err) |
| 182 | } |
| 183 | |
| 184 | if err := mp.Close(); err != nil { |
| 185 | t.Error(err) |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | func TestProducerWithCheckerFunction(t *testing.T) { |
| 190 | trm := newTestReporterMock() |
nothing calls this directly
no test coverage detected