(t *testing.T)
| 285 | } |
| 286 | |
| 287 | func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { |
| 288 | seedBroker := NewMockBroker(t, 1) |
| 289 | leader1 := NewMockBroker(t, 2) |
| 290 | leader2 := NewMockBroker(t, 3) |
| 291 | |
| 292 | metadataLeader1 := new(MetadataResponse) |
| 293 | metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) |
| 294 | metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) |
| 295 | seedBroker.Returns(metadataLeader1) |
| 296 | |
| 297 | config := NewTestConfig() |
| 298 | config.Producer.Retry.Max = 0 // disable! |
| 299 | config.Producer.Retry.Backoff = 0 |
| 300 | config.Producer.Return.Successes = true |
| 301 | producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) |
| 302 | if err != nil { |
| 303 | t.Fatal(err) |
| 304 | } |
| 305 | seedBroker.Close() |
| 306 | |
| 307 | prodNotLeader := new(ProduceResponse) |
| 308 | prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) |
| 309 | leader1.Returns(prodNotLeader) |
| 310 | _, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}) |
| 311 | if !errors.Is(err, ErrNotLeaderForPartition) { |
| 312 | t.Fatal(err) |
| 313 | } |
| 314 | |
| 315 | metadataLeader2 := new(MetadataResponse) |
| 316 | metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) |
| 317 | metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) |
| 318 | leader1.Returns(metadataLeader2) |
| 319 | prodSuccess := new(ProduceResponse) |
| 320 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 321 | leader2.Returns(prodSuccess) |
| 322 | _, _, err = producer.SendMessage(&ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}) |
| 323 | if err != nil { |
| 324 | t.Fatal(err) |
| 325 | } |
| 326 | |
| 327 | leader1.Close() |
| 328 | leader2.Close() |
| 329 | safeClose(t, producer) |
| 330 | } |
| 331 | |
| 332 | // This example shows the basic usage pattern of the SyncProducer. |
| 333 | func ExampleSyncProducer() { |
nothing calls this directly
no test coverage detected