MCPcopy
hub / github.com/IBM/sarama / TestConsumerBounceWithReferenceOpen

Function TestConsumerBounceWithReferenceOpen

consumer_test.go:1475–1572  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1473}
1474
1475func TestConsumerBounceWithReferenceOpen(t *testing.T) {
1476 broker0 := NewMockBroker(t, 0)
1477 broker0Addr := broker0.Addr()
1478 broker1 := NewMockBroker(t, 1)
1479
1480 mockMetadataResponse := NewMockMetadataResponse(t).
1481 SetBroker(broker0.Addr(), broker0.BrokerID()).
1482 SetBroker(broker1.Addr(), broker1.BrokerID()).
1483 SetLeader("my_topic", 0, broker0.BrokerID()).
1484 SetLeader("my_topic", 1, broker1.BrokerID())
1485
1486 mockOffsetResponse := NewMockOffsetResponse(t).
1487 SetOffset("my_topic", 0, OffsetOldest, 1000).
1488 SetOffset("my_topic", 0, OffsetNewest, 1100).
1489 SetOffset("my_topic", 1, OffsetOldest, 2000).
1490 SetOffset("my_topic", 1, OffsetNewest, 2100)
1491
1492 mockFetchResponse := NewMockFetchResponse(t, 1)
1493 for i := range 10 {
1494 mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
1495 mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
1496 }
1497
1498 broker0.SetHandlerByMap(map[string]MockResponse{
1499 "OffsetRequest": mockOffsetResponse,
1500 "FetchRequest": mockFetchResponse,
1501 })
1502 broker1.SetHandlerByMap(map[string]MockResponse{
1503 "MetadataRequest": mockMetadataResponse,
1504 "OffsetRequest": mockOffsetResponse,
1505 "FetchRequest": mockFetchResponse,
1506 })
1507
1508 config := NewTestConfig()
1509 config.Consumer.Return.Errors = true
1510 config.Consumer.Retry.Backoff = 100 * time.Millisecond
1511 config.ChannelBufferSize = 1
1512 master, err := NewConsumer([]string{broker1.Addr()}, config)
1513 if err != nil {
1514 t.Fatal(err)
1515 }
1516
1517 c0, err := master.ConsumePartition("my_topic", 0, 1000)
1518 if err != nil {
1519 t.Fatal(err)
1520 }
1521
1522 c1, err := master.ConsumePartition("my_topic", 1, 2000)
1523 if err != nil {
1524 t.Fatal(err)
1525 }
1526
1527 // read messages from both partition to make sure that both brokers operate
1528 // normally.
1529 assertMessageOffset(t, <-c0.Messages(), 1000)
1530 assertMessageOffset(t, <-c1.Messages(), 2000)
1531
1532 // Simulate broker shutdown. Note that metadata response does not change,

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
SetMessageMethod · 0.95
SetHandlerByMapMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
NewMockBrokerAddrFunction · 0.85

Tested by

no test coverage detected