AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)
| 660 | // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse |
| 661 | // Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions |
| 662 | func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) { |
| 663 | frb := r.getOrCreateBlock(topic, partition) |
| 664 | kb, vb := encodeKV(key, value) |
| 665 | |
| 666 | records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp}) |
| 667 | batch := &RecordBatch{ |
| 668 | Version: 2, |
| 669 | LogAppendTime: r.LogAppendTime, |
| 670 | FirstTimestamp: timestamp, |
| 671 | MaxTimestamp: r.Timestamp, |
| 672 | FirstOffset: offset, |
| 673 | LastOffsetDelta: 0, |
| 674 | ProducerID: producerID, |
| 675 | IsTransactional: isTransactional, |
| 676 | } |
| 677 | rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)} |
| 678 | batch.addRecord(rec) |
| 679 | records.RecordBatch = batch |
| 680 | |
| 681 | frb.RecordsSet = append(frb.RecordsSet, &records) |
| 682 | } |
| 683 | |
| 684 | func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) { |
| 685 | frb := r.getOrCreateBlock(topic, partition) |
no test coverage detected