| 37 | } |
| 38 | |
| 39 | func (ps *produceSet) add(msg *ProducerMessage) error { |
| 40 | var err error |
| 41 | var key, val []byte |
| 42 | |
| 43 | if msg.Key != nil { |
| 44 | if key, err = msg.Key.Encode(); err != nil { |
| 45 | return err |
| 46 | } |
| 47 | } |
| 48 | |
| 49 | if msg.Value != nil { |
| 50 | if val, err = msg.Value.Encode(); err != nil { |
| 51 | return err |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | timestamp := msg.Timestamp |
| 56 | if timestamp.IsZero() { |
| 57 | timestamp = time.Now() |
| 58 | } |
| 59 | timestamp = timestamp.Truncate(time.Millisecond) |
| 60 | |
| 61 | partitions := ps.msgs[msg.Topic] |
| 62 | if partitions == nil { |
| 63 | partitions = make(map[int32]*partitionSet) |
| 64 | ps.msgs[msg.Topic] = partitions |
| 65 | } |
| 66 | |
| 67 | var size int |
| 68 | |
| 69 | set := partitions[msg.Partition] |
| 70 | if set == nil { |
| 71 | if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { |
| 72 | batch := &RecordBatch{ |
| 73 | FirstTimestamp: timestamp, |
| 74 | Version: 2, |
| 75 | Codec: ps.parent.conf.Producer.Compression, |
| 76 | CompressionLevel: ps.parent.conf.Producer.CompressionLevel, |
| 77 | ProducerID: ps.producerID, |
| 78 | ProducerEpoch: ps.producerEpoch, |
| 79 | } |
| 80 | if ps.parent.conf.Producer.Idempotent { |
| 81 | batch.FirstSequence = msg.sequenceNumber |
| 82 | } |
| 83 | set = &partitionSet{recordsToSend: newDefaultRecords(batch)} |
| 84 | size = recordBatchOverhead |
| 85 | } else { |
| 86 | set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))} |
| 87 | } |
| 88 | partitions[msg.Partition] = set |
| 89 | } |
| 90 | |
| 91 | if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { |
| 92 | if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence { |
| 93 | return errors.New("assertion failed: message out of sequence added to a batch") |
| 94 | } |
| 95 | } |
| 96 | |