MCPcopy
hub / github.com/segmentio/kafka-go / writeProduceRequestV7

Method writeProduceRequestV7

write.go:420–455  ·  view source on GitHub ↗
(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch)

Source from the content-addressed store, hash-verified

418}
419
420func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
421
422 h := requestHeader{
423 ApiKey: int16(produce),
424 ApiVersion: int16(v7),
425 CorrelationID: correlationID,
426 ClientID: clientID,
427 }
428 h.Size = (h.size() - 4) +
429 sizeofNullableString(transactionalID) +
430 2 + // required acks
431 4 + // timeout
432 4 + // topic array length
433 sizeofString(topic) + // topic
434 4 + // partition array length
435 4 + // partition
436 4 + // message set size
437 recordBatch.size
438
439 h.writeTo(wb)
440 wb.writeNullableString(transactionalID)
441 wb.writeInt16(requiredAcks) // required acks
442 wb.writeInt32(milliseconds(timeout))
443
444 // topic array
445 wb.writeArrayLen(1)
446 wb.writeString(topic)
447
448 // partition array
449 wb.writeArrayLen(1)
450 wb.writeInt32(partition)
451
452 recordBatch.writeTo(wb)
453
454 return wb.Flush()
455}
456
457func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) {
458 var (

Callers 1

Calls 12

sizeMethod · 0.95
writeToMethod · 0.95
writeNullableStringMethod · 0.95
writeInt16Method · 0.95
writeInt32Method · 0.95
writeArrayLenMethod · 0.95
writeStringMethod · 0.95
FlushMethod · 0.95
sizeofNullableStringFunction · 0.85
sizeofStringFunction · 0.85
millisecondsFunction · 0.85
writeToMethod · 0.65

Tested by

no test coverage detected