(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch)
| 418 | } |
| 419 | |
| 420 | func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { |
| 421 | |
| 422 | h := requestHeader{ |
| 423 | ApiKey: int16(produce), |
| 424 | ApiVersion: int16(v7), |
| 425 | CorrelationID: correlationID, |
| 426 | ClientID: clientID, |
| 427 | } |
| 428 | h.Size = (h.size() - 4) + |
| 429 | sizeofNullableString(transactionalID) + |
| 430 | 2 + // required acks |
| 431 | 4 + // timeout |
| 432 | 4 + // topic array length |
| 433 | sizeofString(topic) + // topic |
| 434 | 4 + // partition array length |
| 435 | 4 + // partition |
| 436 | 4 + // message set size |
| 437 | recordBatch.size |
| 438 | |
| 439 | h.writeTo(wb) |
| 440 | wb.writeNullableString(transactionalID) |
| 441 | wb.writeInt16(requiredAcks) // required acks |
| 442 | wb.writeInt32(milliseconds(timeout)) |
| 443 | |
| 444 | // topic array |
| 445 | wb.writeArrayLen(1) |
| 446 | wb.writeString(topic) |
| 447 | |
| 448 | // partition array |
| 449 | wb.writeArrayLen(1) |
| 450 | wb.writeInt32(partition) |
| 451 | |
| 452 | recordBatch.writeTo(wb) |
| 453 | |
| 454 | return wb.Flush() |
| 455 | } |
| 456 | |
| 457 | func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) { |
| 458 | var ( |
no test coverage detected