| 178 | } |
| 179 | |
| 180 | func (ps *produceSet) buildRequest() *ProduceRequest { |
| 181 | req := &ProduceRequest{ |
| 182 | RequiredAcks: ps.parent.conf.Producer.RequiredAcks, |
| 183 | Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), |
| 184 | } |
| 185 | if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { |
| 186 | req.Version = 2 |
| 187 | } |
| 188 | if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { |
| 189 | req.Version = 3 |
| 190 | if ps.parent.IsTransactional() { |
| 191 | req.TransactionalID = &ps.parent.conf.Producer.Transaction.ID |
| 192 | } |
| 193 | } |
| 194 | if ps.parent.conf.Version.IsAtLeast(V1_0_0_0) { |
| 195 | req.Version = 5 |
| 196 | } |
| 197 | if ps.parent.conf.Version.IsAtLeast(V2_0_0_0) { |
| 198 | req.Version = 6 |
| 199 | } |
| 200 | if ps.parent.conf.Version.IsAtLeast(V2_1_0_0) { |
| 201 | req.Version = 7 |
| 202 | } |
| 203 | if ps.parent.conf.Version.IsAtLeast(V2_4_0_0) { |
| 204 | req.Version = 8 |
| 205 | } |
| 206 | if ps.parent.conf.Version.IsAtLeast(V2_8_0_0) { |
| 207 | req.Version = 9 |
| 208 | } |
| 209 | |
| 210 | for topic, partitionSets := range ps.msgs { |
| 211 | for partition, set := range partitionSets { |
| 212 | if req.Version >= 3 { |
| 213 | // If the API version we're hitting is 3 or greater, we need to calculate |
| 214 | // offsets for each record in the batch relative to FirstOffset. |
| 215 | // Additionally, we must set LastOffsetDelta to the value of the last offset |
| 216 | // in the batch. Since the OffsetDelta of the first record is 0, we know that the |
| 217 | // final record of any batch will have an offset of (# of records in batch) - 1. |
| 218 | // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets |
| 219 | // under the RecordBatch section for details.) |
| 220 | rb := set.recordsToSend.RecordBatch |
| 221 | if len(rb.Records) > 0 { |
| 222 | rb.LastOffsetDelta = int32(len(rb.Records) - 1) |
| 223 | var maxTimestampDelta time.Duration |
| 224 | for i, record := range rb.Records { |
| 225 | record.OffsetDelta = int64(i) |
| 226 | maxTimestampDelta = max(maxTimestampDelta, record.TimestampDelta) |
| 227 | } |
| 228 | // Also set the MaxTimestamp similar to other clients. |
| 229 | rb.MaxTimestamp = rb.FirstTimestamp.Add(maxTimestampDelta) |
| 230 | } |
| 231 | |
| 232 | // Set the batch as transactional when a transactionalID is set |
| 233 | rb.IsTransactional = ps.parent.IsTransactional() |
| 234 | |
| 235 | req.AddBatch(topic, partition, rb) |
| 236 | continue |
| 237 | } |