(t *testing.T)
| 109 | } |
| 110 | |
| 111 | func testWriteProduceRequestV2(t *testing.T) { |
| 112 | key := []byte(nil) |
| 113 | val := []byte("Hello World!") |
| 114 | |
| 115 | msg := messageSetItem{ |
| 116 | Offset: 10, |
| 117 | Message: message{ |
| 118 | MagicByte: 1, |
| 119 | Attributes: 0, |
| 120 | Key: key, |
| 121 | Value: val, |
| 122 | }, |
| 123 | } |
| 124 | msg.MessageSize = msg.Message.size() |
| 125 | msg.Message.CRC = msg.Message.crc32(&crc32Writer{ |
| 126 | table: crc32.IEEETable, |
| 127 | }) |
| 128 | |
| 129 | const timeout = 100 |
| 130 | testWriteOptimization(t, |
| 131 | requestHeader{ |
| 132 | ApiKey: int16(produce), |
| 133 | ApiVersion: int16(v2), |
| 134 | CorrelationID: testCorrelationID, |
| 135 | ClientID: testClientID, |
| 136 | }, |
| 137 | produceRequestV2{ |
| 138 | RequiredAcks: -1, |
| 139 | Timeout: timeout, |
| 140 | Topics: []produceRequestTopicV2{{ |
| 141 | TopicName: testTopic, |
| 142 | Partitions: []produceRequestPartitionV2{{ |
| 143 | Partition: testPartition, |
| 144 | MessageSetSize: msg.size(), MessageSet: messageSet{msg}, |
| 145 | }}, |
| 146 | }}, |
| 147 | }, |
| 148 | func(w *writeBuffer) { |
| 149 | w.writeProduceRequestV2(nil, testCorrelationID, testClientID, testTopic, testPartition, timeout*time.Millisecond, -1, Message{ |
| 150 | Offset: 10, |
| 151 | Key: key, |
| 152 | Value: val, |
| 153 | }) |
| 154 | }, |
| 155 | ) |
| 156 | } |
| 157 | |
| 158 | func testWriteOptimization(t *testing.T, h requestHeader, r request, f func(*writeBuffer)) { |
| 159 | b1 := &bytes.Buffer{} |
nothing calls this directly
no test coverage detected