MCPcopy
hub / github.com/segmentio/kafka-go / produce

Method produce

writer.go:727–742  ·  view source on GitHub ↗
(key topicPartition, batch *writeBatch)

Source from the content-addressed store, hash-verified

725}
726
727func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
728 timeout := w.writeTimeout()
729
730 ctx, cancel := context.WithTimeout(context.Background(), timeout)
731 defer cancel()
732
733 return w.client(timeout).Produce(ctx, &ProduceRequest{
734 Partition: int(key.partition),
735 Topic: key.topic,
736 RequiredAcks: w.RequiredAcks,
737 Compression: w.Compression,
738 Records: &writerRecords{
739 msgs: batch.msgs,
740 },
741 })
742}
743
744func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
745 client := w.client(w.readTimeout())

Callers 1

writeBatchMethod · 0.80

Calls 3

writeTimeoutMethod · 0.95
clientMethod · 0.95
ProduceMethod · 0.80

Tested by

no test coverage detected