MCPcopy
hub / github.com/segmentio/kafka-go / WriteMessages

Method WriteMessages

writer.go:609–696  ·  view source on GitHub ↗

WriteMessages writes a batch of messages to the kafka topic configured on this writer. Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached. When sending synchronously and the

(ctx context.Context, msgs ...Message)

Source from the content-addressed store, hash-verified

607// The program should assume that the whole batch failed and re-write the
608// messages later (which could then cause duplicates).
609func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
610 if w.Addr == nil {
611 return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
612 }
613
614 if !w.enter() {
615 return io.ErrClosedPipe
616 }
617 defer w.leave()
618
619 if len(msgs) == 0 {
620 return nil
621 }
622
623 balancer := w.balancer()
624 batchBytes := w.batchBytes()
625
626 for i := range msgs {
627 n := int64(msgs[i].totalSize())
628 if n > batchBytes {
629 // This error is left for backward compatibility with historical
630 // behavior, but it can yield O(N^2) behaviors. The expectations
631 // are that the program will check if WriteMessages returned a
632 // MessageTooLargeError, discard the message that was exceeding
633 // the maximum size, and try again.
634 return messageTooLarge(msgs, i)
635 }
636 }
637
638 // We use int32 here to halve the memory footprint (compared to using int
639 // on 64 bits architectures). We map lists of the message indexes instead
640 // of the message values for the same reason, int32 is 4 bytes, vs a full
641 // Message value which is 100+ bytes and contains pointers and contributes
642 // to increasing GC work.
643 assignments := make(map[topicPartition][]int32)
644
645 for i, msg := range msgs {
646 topic, err := w.chooseTopic(msg)
647 if err != nil {
648 return err
649 }
650
651 numPartitions, err := w.partitions(ctx, topic)
652 if err != nil {
653 return err
654 }
655
656 partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
657
658 key := topicPartition{
659 topic: topic,
660 partition: int32(partition),
661 }
662
663 assignments[key] = append(assignments[key], int32(i))
664 }
665
666 batches := w.batchMessages(msgs, assignments)

Calls 13

enterMethod · 0.95
leaveMethod · 0.95
balancerMethod · 0.95
batchBytesMethod · 0.95
chooseTopicMethod · 0.95
partitionsMethod · 0.95
batchMessagesMethod · 0.95
messageTooLargeFunction · 0.85
loadCachedPartitionsFunction · 0.85
totalSizeMethod · 0.80
DoneMethod · 0.80
BalanceMethod · 0.65