(t *testing.T)
| 69 | } |
| 70 | |
| 71 | func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { |
| 72 | consumer := NewConsumer(t, NewTestConfig()) |
| 73 | defer func() { |
| 74 | if err := consumer.Close(); err != nil { |
| 75 | t.Error(err) |
| 76 | } |
| 77 | }() |
| 78 | |
| 79 | consumePartitionT0P0 := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) |
| 80 | consumePartitionT0P1 := consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest) |
| 81 | consumePartitionT1P0 := consumer.ExpectConsumePartition("other", 0, AnyOffset) |
| 82 | |
| 83 | consumePartitionT0P0.Pause() |
| 84 | consumePartitionT0P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) |
| 85 | consumePartitionT0P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world x")}) |
| 86 | consumePartitionT0P0.YieldError(sarama.ErrOutOfBrokers) |
| 87 | |
| 88 | consumePartitionT0P1.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")}) |
| 89 | |
| 90 | consumePartitionT1P0.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")}) |
| 91 | |
| 92 | pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) |
| 93 | if err != nil { |
| 94 | t.Fatal(err) |
| 95 | } |
| 96 | if len(pc_test0.Messages()) > 0 { |
| 97 | t.Error("Problem to pause consumption") |
| 98 | } |
| 99 | test0_err := <-pc_test0.Errors() |
| 100 | if !errors.Is(test0_err, sarama.ErrOutOfBrokers) { |
| 101 | t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err) |
| 102 | } |
| 103 | |
| 104 | if pc_test0.HighWaterMarkOffset() != 0 { |
| 105 | t.Error("High water mark offset with value different from the expected: ", pc_test0.HighWaterMarkOffset()) |
| 106 | } |
| 107 | |
| 108 | pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest) |
| 109 | if err != nil { |
| 110 | t.Fatal(err) |
| 111 | } |
| 112 | test1_msg := <-pc_test1.Messages() |
| 113 | if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" { |
| 114 | t.Error("Message was not as expected:", test1_msg) |
| 115 | } |
| 116 | |
| 117 | if pc_test1.HighWaterMarkOffset() != 1 { |
| 118 | t.Error("High water mark offset with value different from the expected: ", pc_test1.HighWaterMarkOffset()) |
| 119 | } |
| 120 | |
| 121 | pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest) |
| 122 | if err != nil { |
| 123 | t.Fatal(err) |
| 124 | } |
| 125 | other0_msg := <-pc_other0.Messages() |
| 126 | if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" { |
| 127 | t.Error("Message was not as expected:", other0_msg) |
| 128 | } |
nothing calls this directly
no test coverage detected