MCPcopy
hub / github.com/IBM/sarama / TestConsumerExpiryTicker

Function TestConsumerExpiryTicker

consumer_test.go:1606–1646  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1604}
1605
1606func TestConsumerExpiryTicker(t *testing.T) {
1607 // Given
1608 broker0 := NewMockBroker(t, 0)
1609 fetchResponse1 := &FetchResponse{}
1610 for i := 1; i <= 8; i++ {
1611 fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
1612 }
1613 broker0.SetHandlerByMap(map[string]MockResponse{
1614 "MetadataRequest": NewMockMetadataResponse(t).
1615 SetBroker(broker0.Addr(), broker0.BrokerID()).
1616 SetLeader("my_topic", 0, broker0.BrokerID()),
1617 "OffsetRequest": NewMockOffsetResponse(t).
1618 SetOffset("my_topic", 0, OffsetNewest, 1234).
1619 SetOffset("my_topic", 0, OffsetOldest, 1),
1620 "FetchRequest": NewMockSequence(fetchResponse1),
1621 })
1622
1623 config := NewTestConfig()
1624 config.ChannelBufferSize = 0
1625 config.Consumer.MaxProcessingTime = 10 * time.Millisecond
1626 master, err := NewConsumer([]string{broker0.Addr()}, config)
1627 if err != nil {
1628 t.Fatal(err)
1629 }
1630
1631 // When
1632 consumer, err := master.ConsumePartition("my_topic", 0, 1)
1633 if err != nil {
1634 t.Fatal(err)
1635 }
1636
1637 // Then: messages with offsets 1 through 8 are read
1638 for i := 1; i <= 8; i++ {
1639 assertMessageOffset(t, <-consumer.Messages(), int64(i))
1640 time.Sleep(2 * time.Millisecond)
1641 }
1642
1643 safeClose(t, consumer)
1644 safeClose(t, master)
1645 broker0.Close()
1646}
1647
1648func TestPartitionConsumerBrokerRace(t *testing.T) {
1649 oldMaxProcs := runtime.GOMAXPROCS(2)

Callers

nothing calls this directly

Calls 15

AddMessageMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockSequenceFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80

Tested by

no test coverage detected