MCPcopy
hub / github.com/segmentio/kafka-go / writeProduceRequestV2

Method writeProduceRequestV2

write.go:328–380  ·  write.go::writeBuffer.writeProduceRequestV2
(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message)

Source from the content-addressed store, hash-verified

326}
327
328func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
329 var size int32
330 var attributes int8
331 var compressed *bytes.Buffer
332
333 if codec == nil {
334 size = messageSetSize(msgs...)
335 } else {
336 compressed, attributes, size, err = compressMessageSet(codec, msgs...)
337 if err != nil {
338 return
339 }
340 msgs = []Message{{Value: compressed.Bytes()}}
341 }
342
343 h := requestHeader{
344 ApiKey: int16(produce),
345 ApiVersion: int16(v2),
346 CorrelationID: correlationID,
347 ClientID: clientID,
348 }
349 h.Size = (h.size() - 4) +
350 2 + // required acks
351 4 + // timeout
352 4 + // topic array length
353 sizeofString(topic) + // topic
354 4 + // partition array length
355 4 + // partition
356 4 + // message set size
357 size
358
359 h.writeTo(wb)
360 wb.writeInt16(requiredAcks) // required acks
361 wb.writeInt32(milliseconds(timeout))
362
363 // topic array
364 wb.writeArrayLen(1)
365 wb.writeString(topic)
366
367 // partition array
368 wb.writeArrayLen(1)
369 wb.writeInt32(partition)
370
371 wb.writeInt32(size)
372 cw := &crc32Writer{table: crc32.IEEETable}
373
374 for _, msg := range msgs {
375 wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
376 }
377
378 releaseBuffer(compressed)
379 return wb.Flush()
380}
381
382func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
383

Callers 2

Calls 14

sizeMethod · 0.95
writeToMethod · 0.95
writeInt16Method · 0.95
writeInt32Method · 0.95
writeArrayLenMethod · 0.95
writeStringMethod · 0.95
writeMessageMethod · 0.95
FlushMethod · 0.95
messageSetSizeFunction · 0.85
compressMessageSetFunction · 0.85
sizeofStringFunction · 0.85
millisecondsFunction · 0.85

Tested by 1