(t *testing.T)
| 1473 | } |
| 1474 | |
| 1475 | func TestConsumerBounceWithReferenceOpen(t *testing.T) { |
| 1476 | broker0 := NewMockBroker(t, 0) |
| 1477 | broker0Addr := broker0.Addr() |
| 1478 | broker1 := NewMockBroker(t, 1) |
| 1479 | |
| 1480 | mockMetadataResponse := NewMockMetadataResponse(t). |
| 1481 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 1482 | SetBroker(broker1.Addr(), broker1.BrokerID()). |
| 1483 | SetLeader("my_topic", 0, broker0.BrokerID()). |
| 1484 | SetLeader("my_topic", 1, broker1.BrokerID()) |
| 1485 | |
| 1486 | mockOffsetResponse := NewMockOffsetResponse(t). |
| 1487 | SetOffset("my_topic", 0, OffsetOldest, 1000). |
| 1488 | SetOffset("my_topic", 0, OffsetNewest, 1100). |
| 1489 | SetOffset("my_topic", 1, OffsetOldest, 2000). |
| 1490 | SetOffset("my_topic", 1, OffsetNewest, 2100) |
| 1491 | |
| 1492 | mockFetchResponse := NewMockFetchResponse(t, 1) |
| 1493 | for i := range 10 { |
| 1494 | mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg) |
| 1495 | mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg) |
| 1496 | } |
| 1497 | |
| 1498 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 1499 | "OffsetRequest": mockOffsetResponse, |
| 1500 | "FetchRequest": mockFetchResponse, |
| 1501 | }) |
| 1502 | broker1.SetHandlerByMap(map[string]MockResponse{ |
| 1503 | "MetadataRequest": mockMetadataResponse, |
| 1504 | "OffsetRequest": mockOffsetResponse, |
| 1505 | "FetchRequest": mockFetchResponse, |
| 1506 | }) |
| 1507 | |
| 1508 | config := NewTestConfig() |
| 1509 | config.Consumer.Return.Errors = true |
| 1510 | config.Consumer.Retry.Backoff = 100 * time.Millisecond |
| 1511 | config.ChannelBufferSize = 1 |
| 1512 | master, err := NewConsumer([]string{broker1.Addr()}, config) |
| 1513 | if err != nil { |
| 1514 | t.Fatal(err) |
| 1515 | } |
| 1516 | |
| 1517 | c0, err := master.ConsumePartition("my_topic", 0, 1000) |
| 1518 | if err != nil { |
| 1519 | t.Fatal(err) |
| 1520 | } |
| 1521 | |
| 1522 | c1, err := master.ConsumePartition("my_topic", 1, 2000) |
| 1523 | if err != nil { |
| 1524 | t.Fatal(err) |
| 1525 | } |
| 1526 | |
| 1527 | // read messages from both partition to make sure that both brokers operate |
| 1528 | // normally. |
| 1529 | assertMessageOffset(t, <-c0.Messages(), 1000) |
| 1530 | assertMessageOffset(t, <-c1.Messages(), 2000) |
| 1531 | |
| 1532 | // Simulate broker shutdown. Note that metadata response does not change, |
nothing calls this directly
no test coverage detected