MCPcopy
hub / github.com/IBM/sarama / add

Method add

produce_set.go:39–132  ·  view source on GitHub ↗
(msg *ProducerMessage)

Source from the content-addressed store, hash-verified

37}
38
39func (ps *produceSet) add(msg *ProducerMessage) error {
40 var err error
41 var key, val []byte
42
43 if msg.Key != nil {
44 if key, err = msg.Key.Encode(); err != nil {
45 return err
46 }
47 }
48
49 if msg.Value != nil {
50 if val, err = msg.Value.Encode(); err != nil {
51 return err
52 }
53 }
54
55 timestamp := msg.Timestamp
56 if timestamp.IsZero() {
57 timestamp = time.Now()
58 }
59 timestamp = timestamp.Truncate(time.Millisecond)
60
61 partitions := ps.msgs[msg.Topic]
62 if partitions == nil {
63 partitions = make(map[int32]*partitionSet)
64 ps.msgs[msg.Topic] = partitions
65 }
66
67 var size int
68
69 set := partitions[msg.Partition]
70 if set == nil {
71 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
72 batch := &RecordBatch{
73 FirstTimestamp: timestamp,
74 Version: 2,
75 Codec: ps.parent.conf.Producer.Compression,
76 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
77 ProducerID: ps.producerID,
78 ProducerEpoch: ps.producerEpoch,
79 }
80 if ps.parent.conf.Producer.Idempotent {
81 batch.FirstSequence = msg.sequenceNumber
82 }
83 set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
84 size = recordBatchOverhead
85 } else {
86 set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
87 }
88 partitions[msg.Partition] = set
89 }
90
91 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
92 if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
93 return errors.New("assertion failed: message out of sequence added to a batch")
94 }
95 }
96

Callers 3

runMethod · 0.80
safeAddMessageFunction · 0.80

Calls 6

newDefaultRecordsFunction · 0.85
newLegacyRecordsFunction · 0.85
IsAtLeastMethod · 0.80
addRecordMethod · 0.80
addMessageMethod · 0.80
EncodeMethod · 0.65

Tested by 2

safeAddMessageFunction · 0.64