MCPcopy
hub / github.com/IBM/sarama / TestConsumerInterleavedClose

Function TestConsumerInterleavedClose

consumer_test.go:1427–1473  ·  view source on GitHub ↗

When two partitions have the same broker as the leader, if one partition consumer channel buffer is full then that does not affect the ability to read messages by the other consumer.

(t *testing.T)

Source from the content-addressed store, hash-verified

1425// consumer channel buffer is full then that does not affect the ability to
1426// read messages by the other consumer.
1427func TestConsumerInterleavedClose(t *testing.T) {
1428 // Given
1429 broker0 := NewMockBroker(t, 0)
1430 broker0.SetHandlerByMap(map[string]MockResponse{
1431 "MetadataRequest": NewMockMetadataResponse(t).
1432 SetBroker(broker0.Addr(), broker0.BrokerID()).
1433 SetLeader("my_topic", 0, broker0.BrokerID()).
1434 SetLeader("my_topic", 1, broker0.BrokerID()),
1435 "OffsetRequest": NewMockOffsetResponse(t).
1436 SetOffset("my_topic", 0, OffsetOldest, 1000).
1437 SetOffset("my_topic", 0, OffsetNewest, 1100).
1438 SetOffset("my_topic", 1, OffsetOldest, 2000).
1439 SetOffset("my_topic", 1, OffsetNewest, 2100),
1440 "FetchRequest": NewMockFetchResponse(t, 1).
1441 SetMessage("my_topic", 0, 1000, testMsg).
1442 SetMessage("my_topic", 0, 1001, testMsg).
1443 SetMessage("my_topic", 0, 1002, testMsg).
1444 SetMessage("my_topic", 1, 2000, testMsg),
1445 })
1446
1447 config := NewTestConfig()
1448 config.ChannelBufferSize = 0
1449 master, err := NewConsumer([]string{broker0.Addr()}, config)
1450 if err != nil {
1451 t.Fatal(err)
1452 }
1453
1454 c0, err := master.ConsumePartition("my_topic", 0, 1000)
1455 if err != nil {
1456 t.Fatal(err)
1457 }
1458
1459 c1, err := master.ConsumePartition("my_topic", 1, 2000)
1460 if err != nil {
1461 t.Fatal(err)
1462 }
1463
1464 // When/Then: we can read from partition 0 even if nobody reads from partition 1
1465 assertMessageOffset(t, <-c0.Messages(), 1000)
1466 assertMessageOffset(t, <-c0.Messages(), 1001)
1467 assertMessageOffset(t, <-c0.Messages(), 1002)
1468
1469 safeClose(t, c1)
1470 safeClose(t, c0)
1471 safeClose(t, master)
1472 broker0.Close()
1473}
1474
1475func TestConsumerBounceWithReferenceOpen(t *testing.T) {
1476 broker0 := NewMockBroker(t, 0)

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80
SetBrokerMethod · 0.80

Tested by

no test coverage detected