MCPcopy
hub / github.com/IBM/sarama / TestProduceSetConsistentTimestamps

Function TestProduceSetConsistentTimestamps

produce_set_test.go:378–433  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

376}
377
378func TestProduceSetConsistentTimestamps(t *testing.T) {
379 parent, ps1 := makeProduceSet()
380 ps2 := newProduceSet(parent)
381 parent.conf.Producer.RequiredAcks = WaitForAll
382 parent.conf.Producer.Timeout = 10 * time.Second
383 parent.conf.Version = V0_11_0_0
384
385 msg1 := &ProducerMessage{
386 Topic: "t1",
387 Partition: 0,
388 Key: StringEncoder(TestMessage),
389 Value: StringEncoder(TestMessage),
390 Timestamp: time.Unix(1555718400, 500000000),
391 sequenceNumber: 123,
392 }
393 msg2 := &ProducerMessage{
394 Topic: "t1",
395 Partition: 0,
396 Key: StringEncoder(TestMessage),
397 Value: StringEncoder(TestMessage),
398 Timestamp: time.Unix(1555718400, 500900000),
399 sequenceNumber: 123,
400 }
401 msg3 := &ProducerMessage{
402 Topic: "t1",
403 Partition: 0,
404 Key: StringEncoder(TestMessage),
405 Value: StringEncoder(TestMessage),
406 Timestamp: time.Unix(1555718400, 600000000),
407 sequenceNumber: 123,
408 }
409
410 safeAddMessage(t, ps1, msg1)
411 safeAddMessage(t, ps1, msg3)
412 req1 := ps1.buildRequest()
413 if req1.Version != 3 {
414 t.Error("Wrong request version")
415 }
416 batch1 := req1.records["t1"][0].RecordBatch
417 ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
418 time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)
419
420 safeAddMessage(t, ps2, msg2)
421 safeAddMessage(t, ps2, msg3)
422 req2 := ps2.buildRequest()
423 if req2.Version != 3 {
424 t.Error("Wrong request version")
425 }
426 batch2 := req2.records["t1"][0].RecordBatch
427 ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
428 time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)
429
430 if time1 != time2 {
431 t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
432 }
433}

Callers

nothing calls this directly

Calls 7

makeProduceSetFunction · 0.85
newProduceSetFunction · 0.85
StringEncoderTypeAlias · 0.85
safeAddMessageFunction · 0.85
buildRequestMethod · 0.80
ErrorMethod · 0.65
ErrorfMethod · 0.65

Tested by

no test coverage detected