(t *testing.T)
| 174 | } |
| 175 | |
| 176 | func TestProduceSetCompressedRequestBuilding(t *testing.T) { |
| 177 | parent, ps := makeProduceSet() |
| 178 | parent.conf.Producer.RequiredAcks = WaitForAll |
| 179 | parent.conf.Producer.Timeout = 10 * time.Second |
| 180 | parent.conf.Producer.Compression = CompressionGZIP |
| 181 | parent.conf.Version = V0_10_0_0 |
| 182 | |
| 183 | msg := &ProducerMessage{ |
| 184 | Topic: "t1", |
| 185 | Partition: 0, |
| 186 | Key: StringEncoder(TestMessage), |
| 187 | Value: StringEncoder(TestMessage), |
| 188 | Timestamp: time.Now(), |
| 189 | } |
| 190 | for range 10 { |
| 191 | safeAddMessage(t, ps, msg) |
| 192 | } |
| 193 | |
| 194 | req := ps.buildRequest() |
| 195 | |
| 196 | if req.Version != 2 { |
| 197 | t.Error("Wrong request version") |
| 198 | } |
| 199 | |
| 200 | for _, msgBlock := range req.records["t1"][0].MsgSet.Messages { |
| 201 | msg := msgBlock.Msg |
| 202 | err := msg.decodeSet() |
| 203 | if err != nil { |
| 204 | t.Error("Failed to decode set from payload") |
| 205 | } |
| 206 | for i, compMsgBlock := range msg.Set.Messages { |
| 207 | compMsg := compMsgBlock.Msg |
| 208 | if compMsg.Version != 1 { |
| 209 | t.Error("Wrong compressed message version") |
| 210 | } |
| 211 | if compMsgBlock.Offset != int64(i) { |
| 212 | t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset) |
| 213 | } |
| 214 | } |
| 215 | if msg.Version != 1 { |
| 216 | t.Error("Wrong compressed parent message version") |
| 217 | } |
| 218 | } |
| 219 | } |
| 220 | |
| 221 | func TestProduceSetV3RequestBuilding(t *testing.T) { |
| 222 | parent, ps := makeProduceSet() |
nothing calls this directly
no test coverage detected