(t *testing.T)
| 288 | } |
| 289 | |
| 290 | func TestProduceSetIdempotentRequestBuilding(t *testing.T) { |
| 291 | const pID = 1000 |
| 292 | const pEpoch = 1234 |
| 293 | |
| 294 | config := NewTestConfig() |
| 295 | config.Producer.RequiredAcks = WaitForAll |
| 296 | config.Producer.Idempotent = true |
| 297 | config.Version = V0_11_0_0 |
| 298 | |
| 299 | parent := &asyncProducer{ |
| 300 | conf: config, |
| 301 | txnmgr: &transactionManager{ |
| 302 | producerID: pID, |
| 303 | producerEpoch: pEpoch, |
| 304 | }, |
| 305 | } |
| 306 | ps := newProduceSet(parent) |
| 307 | |
| 308 | now := time.Now() |
| 309 | msg := &ProducerMessage{ |
| 310 | Topic: "t1", |
| 311 | Partition: 0, |
| 312 | Key: StringEncoder(TestMessage), |
| 313 | Value: StringEncoder(TestMessage), |
| 314 | Headers: []RecordHeader{ |
| 315 | { |
| 316 | Key: []byte("header-1"), |
| 317 | Value: []byte("value-1"), |
| 318 | }, |
| 319 | { |
| 320 | Key: []byte("header-2"), |
| 321 | Value: []byte("value-2"), |
| 322 | }, |
| 323 | { |
| 324 | Key: []byte("header-3"), |
| 325 | Value: []byte("value-3"), |
| 326 | }, |
| 327 | }, |
| 328 | Timestamp: now, |
| 329 | sequenceNumber: 123, |
| 330 | } |
| 331 | for range 10 { |
| 332 | safeAddMessage(t, ps, msg) |
| 333 | msg.Timestamp = msg.Timestamp.Add(time.Second) |
| 334 | } |
| 335 | |
| 336 | req := ps.buildRequest() |
| 337 | |
| 338 | if req.Version != 3 { |
| 339 | t.Error("Wrong request version") |
| 340 | } |
| 341 | |
| 342 | batch := req.records["t1"][0].RecordBatch |
| 343 | if !batch.FirstTimestamp.Equal(now.Truncate(time.Millisecond)) { |
| 344 | t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp) |
| 345 | } |
| 346 | if batch.ProducerID != pID { |
| 347 | t.Errorf("Wrong producerID: %v", batch.ProducerID) |
nothing calls this directly
no test coverage detected