MCPcopy
hub / github.com/IBM/sarama / AddRecordWithTimestamp

Method AddRecordWithTimestamp

fetch_response.go:647–657  ·  view source on GitHub ↗
(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time)

Source from the content-addressed store, hash-verified

645}
646
647func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
648 frb := r.getOrCreateBlock(topic, partition)
649 kb, vb := encodeKV(key, value)
650 if len(frb.RecordsSet) == 0 {
651 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
652 frb.RecordsSet = []*Records{&records}
653 }
654 batch := frb.RecordsSet[0].RecordBatch
655 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
656 batch.addRecord(rec)
657}
658
659// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
660// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse

Callers 2

TestConsumerTimestampsFunction · 0.95
AddRecordMethod · 0.95

Calls 4

getOrCreateBlockMethod · 0.95
encodeKVFunction · 0.85
newDefaultRecordsFunction · 0.85
addRecordMethod · 0.80

Tested by 1

TestConsumerTimestampsFunction · 0.76