(t *testing.T)
| 209 | } |
| 210 | |
| 211 | func TestProducerWithBrokenPartitioner(t *testing.T) { |
| 212 | trm := newTestReporterMock() |
| 213 | config := NewTestConfig() |
| 214 | config.Producer.Partitioner = func(string) sarama.Partitioner { |
| 215 | return brokePartitioner{} |
| 216 | } |
| 217 | mp := NewAsyncProducer(trm, config) |
| 218 | mp.ExpectInputWithMessageCheckerFunctionAndSucceed(func(msg *sarama.ProducerMessage) error { |
| 219 | if msg.Partition != 15 { |
| 220 | t.Error("Expected partition 15, found: ", msg.Partition) |
| 221 | } |
| 222 | if msg.Topic != "test" { |
| 223 | t.Errorf(`Expected topic "test", found: %q`, msg.Topic) |
| 224 | } |
| 225 | return nil |
| 226 | }) |
| 227 | mp.ExpectInputAndSucceed() // should actually fail in partitioning |
| 228 | |
| 229 | mp.Input() <- &sarama.ProducerMessage{Topic: "test"} |
| 230 | mp.Input() <- &sarama.ProducerMessage{Topic: "not-test"} |
| 231 | if err := mp.Close(); err != nil { |
| 232 | t.Error(err) |
| 233 | } |
| 234 | |
| 235 | if len(trm.errors) != 1 || !strings.Contains(trm.errors[0], "partitioning unavailable") { |
| 236 | t.Error("Expected to report partitioning unavailable, found", trm.errors) |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | // brokeProducer refuses to partition anything not on the “test” topic, and sends everything on |
| 241 | // that topic to partition 15. |
nothing calls this directly
no test coverage detected