(t *testing.T)
| 1604 | } |
| 1605 | |
| 1606 | func TestConsumerExpiryTicker(t *testing.T) { |
| 1607 | // Given |
| 1608 | broker0 := NewMockBroker(t, 0) |
| 1609 | fetchResponse1 := &FetchResponse{} |
| 1610 | for i := 1; i <= 8; i++ { |
| 1611 | fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i)) |
| 1612 | } |
| 1613 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 1614 | "MetadataRequest": NewMockMetadataResponse(t). |
| 1615 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 1616 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 1617 | "OffsetRequest": NewMockOffsetResponse(t). |
| 1618 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 1619 | SetOffset("my_topic", 0, OffsetOldest, 1), |
| 1620 | "FetchRequest": NewMockSequence(fetchResponse1), |
| 1621 | }) |
| 1622 | |
| 1623 | config := NewTestConfig() |
| 1624 | config.ChannelBufferSize = 0 |
| 1625 | config.Consumer.MaxProcessingTime = 10 * time.Millisecond |
| 1626 | master, err := NewConsumer([]string{broker0.Addr()}, config) |
| 1627 | if err != nil { |
| 1628 | t.Fatal(err) |
| 1629 | } |
| 1630 | |
| 1631 | // When |
| 1632 | consumer, err := master.ConsumePartition("my_topic", 0, 1) |
| 1633 | if err != nil { |
| 1634 | t.Fatal(err) |
| 1635 | } |
| 1636 | |
| 1637 | // Then: messages with offsets 1 through 8 are read |
| 1638 | for i := 1; i <= 8; i++ { |
| 1639 | assertMessageOffset(t, <-consumer.Messages(), int64(i)) |
| 1640 | time.Sleep(2 * time.Millisecond) |
| 1641 | } |
| 1642 | |
| 1643 | safeClose(t, consumer) |
| 1644 | safeClose(t, master) |
| 1645 | broker0.Close() |
| 1646 | } |
| 1647 | |
| 1648 | func TestPartitionConsumerBrokerRace(t *testing.T) { |
| 1649 | oldMaxProcs := runtime.GOMAXPROCS(2) |
nothing calls this directly
no test coverage detected