MCPcopy
hub / github.com/IBM/sarama / AddMessageWithTimestamp

Method AddMessageWithTimestamp

fetch_response.go:631–645  ·  view source on GitHub ↗
(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8)

Source from the content-addressed store, hash-verified

629}
630
631func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
632 frb := r.getOrCreateBlock(topic, partition)
633 kb, vb := encodeKV(key, value)
634 if r.LogAppendTime {
635 timestamp = r.Timestamp
636 }
637 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
638 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
639 if len(frb.RecordsSet) == 0 {
640 records := newLegacyRecords(&MessageSet{})
641 frb.RecordsSet = []*Records{&records}
642 }
643 set := frb.RecordsSet[0].MsgSet
644 set.Messages = append(set.Messages, msgBlock)
645}
646
647func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
648 frb := r.getOrCreateBlock(topic, partition)

Callers 2

TestConsumerTimestampsFunction · 0.95
AddMessageMethod · 0.95

Calls 3

getOrCreateBlockMethod · 0.95
encodeKVFunction · 0.85
newLegacyRecordsFunction · 0.85

Tested by 1

TestConsumerTimestampsFunction · 0.76