MCPcopy
hub / github.com/segmentio/kafka-go / writeProduceRequestV3

Method writeProduceRequestV3

write.go:382–418  ·  view source on GitHub ↗
(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch)

Source from the content-addressed store, hash-verified

380}
381
382func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
383
384 h := requestHeader{
385 ApiKey: int16(produce),
386 ApiVersion: int16(v3),
387 CorrelationID: correlationID,
388 ClientID: clientID,
389 }
390
391 h.Size = (h.size() - 4) +
392 sizeofNullableString(transactionalID) +
393 2 + // required acks
394 4 + // timeout
395 4 + // topic array length
396 sizeofString(topic) + // topic
397 4 + // partition array length
398 4 + // partition
399 4 + // message set size
400 recordBatch.size
401
402 h.writeTo(wb)
403 wb.writeNullableString(transactionalID)
404 wb.writeInt16(requiredAcks) // required acks
405 wb.writeInt32(milliseconds(timeout))
406
407 // topic array
408 wb.writeArrayLen(1)
409 wb.writeString(topic)
410
411 // partition array
412 wb.writeArrayLen(1)
413 wb.writeInt32(partition)
414
415 recordBatch.writeTo(wb)
416
417 return wb.Flush()
418}
419
420func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
421

Callers 1

Calls 12

sizeMethod · 0.95
writeToMethod · 0.95
writeNullableStringMethod · 0.95
writeInt16Method · 0.95
writeInt32Method · 0.95
writeArrayLenMethod · 0.95
writeStringMethod · 0.95
FlushMethod · 0.95
sizeofNullableStringFunction · 0.85
sizeofStringFunction · 0.85
millisecondsFunction · 0.85
writeToMethod · 0.65

Tested by

no test coverage detected