(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch)
| 380 | } |
| 381 | |
| 382 | func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { |
| 383 | |
| 384 | h := requestHeader{ |
| 385 | ApiKey: int16(produce), |
| 386 | ApiVersion: int16(v3), |
| 387 | CorrelationID: correlationID, |
| 388 | ClientID: clientID, |
| 389 | } |
| 390 | |
| 391 | h.Size = (h.size() - 4) + |
| 392 | sizeofNullableString(transactionalID) + |
| 393 | 2 + // required acks |
| 394 | 4 + // timeout |
| 395 | 4 + // topic array length |
| 396 | sizeofString(topic) + // topic |
| 397 | 4 + // partition array length |
| 398 | 4 + // partition |
| 399 | 4 + // message set size |
| 400 | recordBatch.size |
| 401 | |
| 402 | h.writeTo(wb) |
| 403 | wb.writeNullableString(transactionalID) |
| 404 | wb.writeInt16(requiredAcks) // required acks |
| 405 | wb.writeInt32(milliseconds(timeout)) |
| 406 | |
| 407 | // topic array |
| 408 | wb.writeArrayLen(1) |
| 409 | wb.writeString(topic) |
| 410 | |
| 411 | // partition array |
| 412 | wb.writeArrayLen(1) |
| 413 | wb.writeInt32(partition) |
| 414 | |
| 415 | recordBatch.writeTo(wb) |
| 416 | |
| 417 | return wb.Flush() |
| 418 | } |
| 419 | |
| 420 | func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { |
| 421 |
no test coverage detected