MCPcopy
hub / github.com/IBM/sarama / TestConsumerHandlesExpectationsPausingResuming

Function TestConsumerHandlesExpectationsPausingResuming

mocks/consumer_test.go:71–148  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

69}
70
71func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) {
72 consumer := NewConsumer(t, NewTestConfig())
73 defer func() {
74 if err := consumer.Close(); err != nil {
75 t.Error(err)
76 }
77 }()
78
79 consumePartitionT0P0 := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
80 consumePartitionT0P1 := consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest)
81 consumePartitionT1P0 := consumer.ExpectConsumePartition("other", 0, AnyOffset)
82
83 consumePartitionT0P0.Pause()
84 consumePartitionT0P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
85 consumePartitionT0P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world x")})
86 consumePartitionT0P0.YieldError(sarama.ErrOutOfBrokers)
87
88 consumePartitionT0P1.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
89
90 consumePartitionT1P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
91
92 pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
93 if err != nil {
94 t.Fatal(err)
95 }
96 if len(pc_test0.Messages()) > 0 {
97 t.Error("Problem to pause consumption")
98 }
99 test0_err := <-pc_test0.Errors()
100 if !errors.Is(test0_err, sarama.ErrOutOfBrokers) {
101 t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err)
102 }
103
104 if pc_test0.HighWaterMarkOffset() != 0 {
105 t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset())
106 }
107
108 pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
109 if err != nil {
110 t.Fatal(err)
111 }
112 test1_msg := <-pc_test1.Messages()
113 if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" {
114 t.Error("Message was not as expected:", test1_msg)
115 }
116
117 if pc_test1.HighWaterMarkOffset() != 1 {
118 t.Error("High water mark offset with value different from the expected: ", pc_test1.HighWaterMarkOffset())
119 }
120
121 pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest)
122 if err != nil {
123 t.Fatal(err)
124 }
125 other0_msg := <-pc_other0.Messages()
126 if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" {
127 t.Error("Message was not as expected:", other0_msg)
128 }

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
YieldMessageMethod · 0.80
YieldErrorMethod · 0.80
FatalMethod · 0.80
IsMethod · 0.80
NewConsumerFunction · 0.70
NewTestConfigFunction · 0.70
ErrorMethod · 0.65
PauseMethod · 0.65
MessagesMethod · 0.65

Tested by

no test coverage detected