TestConsumerPartialBatchRecovery exercises the partial-batch recovery path (#1657): the consumer's initial Fetch.Default is smaller than the produced record batch, so the broker returns a partial batch and reports its true size. The follow-up fetch must use that reported size to deliver the message
(t *testing.T)
| 85 | // size. The follow-up fetch must use that reported size to deliver the |
| 86 | // message in a single retry instead of doubling its way up. |
| 87 | func TestConsumerPartialBatchRecovery(t *testing.T) { |
| 88 | setupFunctionalTest(t) |
| 89 | defer teardownFunctionalTest(t) |
| 90 | |
| 91 | const payloadSize = 256 * 1024 |
| 92 | |
| 93 | producerConfig := NewFunctionalTestConfig() |
| 94 | producerConfig.Producer.Return.Successes = true |
| 95 | producerConfig.Producer.MaxMessageBytes = 2 * payloadSize |
| 96 | |
| 97 | p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, producerConfig) |
| 98 | assert.NoError(t, err) |
| 99 | defer safeClose(t, p) |
| 100 | |
| 101 | payload := make([]byte, payloadSize) |
| 102 | for i := range payload { |
| 103 | payload[i] = byte(i) |
| 104 | } |
| 105 | _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: ByteEncoder(payload)}) |
| 106 | assert.NoError(t, err) |
| 107 | |
| 108 | consumerConfig := NewFunctionalTestConfig() |
| 109 | // 12 bytes is the minimum that lets the broker include the per-batch length |
| 110 | // field in the partial response, so the consumer can size its retry exactly |
| 111 | consumerConfig.Consumer.Fetch.Default = 12 |
| 112 | consumerConfig.Consumer.Fetch.Max = 2 * payloadSize |
| 113 | |
| 114 | c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consumerConfig) |
| 115 | assert.NoError(t, err) |
| 116 | defer safeClose(t, c) |
| 117 | |
| 118 | pc, err := c.ConsumePartition("test.1", 0, offset) |
| 119 | assert.NoError(t, err) |
| 120 | defer safeClose(t, pc) |
| 121 | |
| 122 | select { |
| 123 | case msg := <-pc.Messages(): |
| 124 | assert.Equal(t, offset, msg.Offset) |
| 125 | assert.Equal(t, payload, msg.Value) |
| 126 | case err := <-pc.Errors(): |
| 127 | t.Fatalf("consumer error: %v", err) |
| 128 | case <-time.After(30 * time.Second): |
| 129 | t.Fatal("timed out waiting for the large message") |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | // Makes sure that messages produced by all supported client versions/ |
| 134 | // compression codecs (except LZ4) combinations can be consumed by all |
nothing calls this directly
no test coverage detected