MCPcopy
hub / github.com/IBM/sarama / TestConsumerPartialBatchRecovery

Function TestConsumerPartialBatchRecovery

functional_consumer_test.go:87–131  ·  functional_consumer_test.go::TestConsumerPartialBatchRecovery

TestConsumerPartialBatchRecovery exercises the partial-batch recovery path (#1657): the consumer's initial Fetch.Default is smaller than the produced record batch, so the broker returns a partial batch and reports its true size. The follow-up fetch must use that reported size to deliver the message

(t *testing.T)

Source from the content-addressed store, hash-verified

85// size. The follow-up fetch must use that reported size to deliver the
86// message in a single retry instead of doubling its way up.
87func TestConsumerPartialBatchRecovery(t *testing.T) {
88 setupFunctionalTest(t)
89 defer teardownFunctionalTest(t)
90
91 const payloadSize = 256 * 1024
92
93 producerConfig := NewFunctionalTestConfig()
94 producerConfig.Producer.Return.Successes = true
95 producerConfig.Producer.MaxMessageBytes = 2 * payloadSize
96
97 p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, producerConfig)
98 assert.NoError(t, err)
99 defer safeClose(t, p)
100
101 payload := make([]byte, payloadSize)
102 for i := range payload {
103 payload[i] = byte(i)
104 }
105 _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: ByteEncoder(payload)})
106 assert.NoError(t, err)
107
108 consumerConfig := NewFunctionalTestConfig()
109 // 12 bytes is the minimum that lets the broker include the per-batch length
110 // field in the partial response, so the consumer can size its retry exactly
111 consumerConfig.Consumer.Fetch.Default = 12
112 consumerConfig.Consumer.Fetch.Max = 2 * payloadSize
113
114 c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consumerConfig)
115 assert.NoError(t, err)
116 defer safeClose(t, c)
117
118 pc, err := c.ConsumePartition("test.1", 0, offset)
119 assert.NoError(t, err)
120 defer safeClose(t, pc)
121
122 select {
123 case msg := <-pc.Messages():
124 assert.Equal(t, offset, msg.Offset)
125 assert.Equal(t, payload, msg.Value)
126 case err := <-pc.Errors():
127 t.Fatalf("consumer error: %v", err)
128 case <-time.After(30 * time.Second):
129 t.Fatal("timed out waiting for the large message")
130 }
131}
132
133// Makes sure that messages produced by all supported client versions/
134// compression codecs (except LZ4) combinations can be consumed by all

Callers

nothing calls this directly

Calls 13

SendMessageMethod · 0.95
ConsumePartitionMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
ByteEncoderTypeAlias · 0.85
FatalfMethod · 0.80
FatalMethod · 0.80
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70
NewConsumerFunction · 0.70
MessagesMethod · 0.65

Tested by

no test coverage detected