WriteMessages writes a batch of messages to the kafka topic configured on this writer. Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached. When sending synchronously and the
(ctx context.Context, msgs ...Message)
| 607 | // The program should assume that the whole batch failed and re-write the |
| 608 | // messages later (which could then cause duplicates). |
| 609 | func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { |
| 610 | if w.Addr == nil { |
| 611 | return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address") |
| 612 | } |
| 613 | |
| 614 | if !w.enter() { |
| 615 | return io.ErrClosedPipe |
| 616 | } |
| 617 | defer w.leave() |
| 618 | |
| 619 | if len(msgs) == 0 { |
| 620 | return nil |
| 621 | } |
| 622 | |
| 623 | balancer := w.balancer() |
| 624 | batchBytes := w.batchBytes() |
| 625 | |
| 626 | for i := range msgs { |
| 627 | n := int64(msgs[i].totalSize()) |
| 628 | if n > batchBytes { |
| 629 | // This error is left for backward compatibility with historical |
| 630 | // behavior, but it can yield O(N^2) behaviors. The expectations |
| 631 | // are that the program will check if WriteMessages returned a |
| 632 | // MessageTooLargeError, discard the message that was exceeding |
| 633 | // the maximum size, and try again. |
| 634 | return messageTooLarge(msgs, i) |
| 635 | } |
| 636 | } |
| 637 | |
| 638 | // We use int32 here to halve the memory footprint (compared to using int |
| 639 | // on 64 bits architectures). We map lists of the message indexes instead |
| 640 | // of the message values for the same reason, int32 is 4 bytes, vs a full |
| 641 | // Message value which is 100+ bytes and contains pointers and contributes |
| 642 | // to increasing GC work. |
| 643 | assignments := make(map[topicPartition][]int32) |
| 644 | |
| 645 | for i, msg := range msgs { |
| 646 | topic, err := w.chooseTopic(msg) |
| 647 | if err != nil { |
| 648 | return err |
| 649 | } |
| 650 | |
| 651 | numPartitions, err := w.partitions(ctx, topic) |
| 652 | if err != nil { |
| 653 | return err |
| 654 | } |
| 655 | |
| 656 | partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...) |
| 657 | |
| 658 | key := topicPartition{ |
| 659 | topic: topic, |
| 660 | partition: int32(partition), |
| 661 | } |
| 662 | |
| 663 | assignments[key] = append(assignments[key], int32(i)) |
| 664 | } |
| 665 | |
| 666 | batches := w.batchMessages(msgs, assignments) |