| 645 | } |
| 646 | |
| 647 | func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) { |
| 648 | frb := r.getOrCreateBlock(topic, partition) |
| 649 | kb, vb := encodeKV(key, value) |
| 650 | if len(frb.RecordsSet) == 0 { |
| 651 | records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp}) |
| 652 | frb.RecordsSet = []*Records{&records} |
| 653 | } |
| 654 | batch := frb.RecordsSet[0].RecordBatch |
| 655 | rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)} |
| 656 | batch.addRecord(rec) |
| 657 | } |
| 658 | |
| 659 | // AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp |
| 660 | // But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse |