When two partitions have the same broker as the leader, if one partition consumer channel buffer is full then that does not affect the ability to read messages by the other consumer.
(t *testing.T)
| 1425 | // consumer channel buffer is full then that does not affect the ability to |
| 1426 | // read messages by the other consumer. |
| 1427 | func TestConsumerInterleavedClose(t *testing.T) { |
| 1428 | // Given |
| 1429 | broker0 := NewMockBroker(t, 0) |
| 1430 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 1431 | "MetadataRequest": NewMockMetadataResponse(t). |
| 1432 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 1433 | SetLeader("my_topic", 0, broker0.BrokerID()). |
| 1434 | SetLeader("my_topic", 1, broker0.BrokerID()), |
| 1435 | "OffsetRequest": NewMockOffsetResponse(t). |
| 1436 | SetOffset("my_topic", 0, OffsetOldest, 1000). |
| 1437 | SetOffset("my_topic", 0, OffsetNewest, 1100). |
| 1438 | SetOffset("my_topic", 1, OffsetOldest, 2000). |
| 1439 | SetOffset("my_topic", 1, OffsetNewest, 2100), |
| 1440 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 1441 | SetMessage("my_topic", 0, 1000, testMsg). |
| 1442 | SetMessage("my_topic", 0, 1001, testMsg). |
| 1443 | SetMessage("my_topic", 0, 1002, testMsg). |
| 1444 | SetMessage("my_topic", 1, 2000, testMsg), |
| 1445 | }) |
| 1446 | |
| 1447 | config := NewTestConfig() |
| 1448 | config.ChannelBufferSize = 0 |
| 1449 | master, err := NewConsumer([]string{broker0.Addr()}, config) |
| 1450 | if err != nil { |
| 1451 | t.Fatal(err) |
| 1452 | } |
| 1453 | |
| 1454 | c0, err := master.ConsumePartition("my_topic", 0, 1000) |
| 1455 | if err != nil { |
| 1456 | t.Fatal(err) |
| 1457 | } |
| 1458 | |
| 1459 | c1, err := master.ConsumePartition("my_topic", 1, 2000) |
| 1460 | if err != nil { |
| 1461 | t.Fatal(err) |
| 1462 | } |
| 1463 | |
| 1464 | // When/Then: we can read from partition 0 even if nobody reads from partition 1 |
| 1465 | assertMessageOffset(t, <-c0.Messages(), 1000) |
| 1466 | assertMessageOffset(t, <-c0.Messages(), 1001) |
| 1467 | assertMessageOffset(t, <-c0.Messages(), 1002) |
| 1468 | |
| 1469 | safeClose(t, c1) |
| 1470 | safeClose(t, c0) |
| 1471 | safeClose(t, master) |
| 1472 | broker0.Close() |
| 1473 | } |
| 1474 | |
| 1475 | func TestConsumerBounceWithReferenceOpen(t *testing.T) { |
| 1476 | broker0 := NewMockBroker(t, 0) |
nothing calls this directly
no test coverage detected