(pe packetEncoder)
| 70 | } |
| 71 | |
| 72 | func (r *ProduceRequest) encode(pe packetEncoder) error { |
| 73 | if r.Version >= 3 { |
| 74 | if err := pe.putNullableString(r.TransactionalID); err != nil { |
| 75 | return err |
| 76 | } |
| 77 | } |
| 78 | pe.putInt16(int16(r.RequiredAcks)) |
| 79 | pe.putInt32(r.Timeout) |
| 80 | metricRegistry := pe.metricRegistry() |
| 81 | var batchSizeMetric metrics.Histogram |
| 82 | var compressionRatioMetric metrics.Histogram |
| 83 | if metricRegistry != nil { |
| 84 | batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) |
| 85 | compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) |
| 86 | } |
| 87 | totalRecordCount := int64(0) |
| 88 | |
| 89 | err := pe.putArrayLength(len(r.records)) |
| 90 | if err != nil { |
| 91 | return err |
| 92 | } |
| 93 | |
| 94 | for topic, partitions := range r.records { |
| 95 | err = pe.putString(topic) |
| 96 | if err != nil { |
| 97 | return err |
| 98 | } |
| 99 | err = pe.putArrayLength(len(partitions)) |
| 100 | if err != nil { |
| 101 | return err |
| 102 | } |
| 103 | topicRecordCount := int64(0) |
| 104 | var topicCompressionRatioMetric metrics.Histogram |
| 105 | var topicBatchSizeMetric metrics.Histogram |
| 106 | if metricRegistry != nil { |
| 107 | topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) |
| 108 | topicBatchSizeMetric = getOrRegisterTopicHistogram("batch-size", topic, metricRegistry) |
| 109 | } |
| 110 | for id, records := range partitions { |
| 111 | startOffset := pe.offset() |
| 112 | pe.putInt32(id) |
| 113 | if r.Version >= 9 { |
| 114 | // compact bytes: size the records with a prep pass to write |
| 115 | // the uvarint prefix, then encode the batch non-flexibly |
| 116 | var prep prepEncoder |
| 117 | if err = records.encode(&prep); err != nil { |
| 118 | return err |
| 119 | } |
| 120 | pe.putUVarint(uint64(prep.length) + 1) |
| 121 | if err = records.encode(downgradeFlexibleEncoder(pe)); err != nil { |
| 122 | return err |
| 123 | } |
| 124 | } else { |
| 125 | pe.push(&lengthField{}) |
| 126 | err = records.encode(pe) |
| 127 | if err != nil { |
| 128 | return err |
| 129 | } |
nothing calls this directly
no test coverage detected