(t *testing.T, config *Config)
| 413 | } |
| 414 | |
| 415 | func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { |
| 416 | // Given |
| 417 | broker0 := NewMockBroker(t, 100) |
| 418 | |
| 419 | // Stage 1: my_topic/0 served by broker0 |
| 420 | Logger.Printf(" STAGE 1") |
| 421 | |
| 422 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 423 | "MetadataRequest": NewMockMetadataResponse(t). |
| 424 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 425 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 426 | "OffsetRequest": NewMockOffsetResponse(t). |
| 427 | SetOffset("my_topic", 0, OffsetOldest, 123). |
| 428 | SetOffset("my_topic", 0, OffsetNewest, 1000), |
| 429 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 430 | SetMessage("my_topic", 0, 123, testMsg), |
| 431 | }) |
| 432 | |
| 433 | c, err := NewConsumer([]string{broker0.Addr()}, config) |
| 434 | if err != nil { |
| 435 | t.Fatal(err) |
| 436 | } |
| 437 | |
| 438 | pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest) |
| 439 | if err != nil { |
| 440 | t.Fatal(err) |
| 441 | } |
| 442 | |
| 443 | assertMessageOffset(t, <-pc.Messages(), 123) |
| 444 | |
| 445 | // Stage 2: broker0 says that it is no longer the leader for my_topic/0, |
| 446 | // but the requests to retrieve metadata fail with network timeout. |
| 447 | Logger.Printf(" STAGE 2") |
| 448 | |
| 449 | fetchResponse2 := &FetchResponse{} |
| 450 | fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition) |
| 451 | |
| 452 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 453 | "FetchRequest": NewMockWrapper(fetchResponse2), |
| 454 | }) |
| 455 | |
| 456 | if consErr := <-pc.Errors(); !errors.Is(consErr.Err, ErrOutOfBrokers) { |
| 457 | t.Errorf("Unexpected error: %v", consErr.Err) |
| 458 | } |
| 459 | |
| 460 | // Stage 3: finally the metadata returned by broker0 tells that broker1 is |
| 461 | // a new leader for my_topic/0. Consumption resumes. |
| 462 | |
| 463 | Logger.Printf(" STAGE 3") |
| 464 | |
| 465 | broker1 := NewMockBroker(t, 101) |
| 466 | |
| 467 | broker1.SetHandlerByMap(map[string]MockResponse{ |
| 468 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 469 | SetMessage("my_topic", 0, 124, testMsg), |
| 470 | }) |
| 471 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 472 | "MetadataRequest": NewMockMetadataResponse(t). |
no test coverage detected