(messages []Message, assignments map[topicPartition][]int32)
| 696 | } |
| 697 | |
| 698 | func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 { |
| 699 | var batches map[*writeBatch][]int32 |
| 700 | if !w.Async { |
| 701 | batches = make(map[*writeBatch][]int32, len(assignments)) |
| 702 | } |
| 703 | |
| 704 | w.mutex.Lock() |
| 705 | defer w.mutex.Unlock() |
| 706 | |
| 707 | if w.writers == nil { |
| 708 | w.writers = map[topicPartition]*partitionWriter{} |
| 709 | } |
| 710 | |
| 711 | for key, indexes := range assignments { |
| 712 | writer := w.writers[key] |
| 713 | if writer == nil { |
| 714 | writer = newPartitionWriter(w, key) |
| 715 | w.writers[key] = writer |
| 716 | } |
| 717 | wbatches := writer.writeMessages(messages, indexes) |
| 718 | |
| 719 | for batch, idxs := range wbatches { |
| 720 | batches[batch] = idxs |
| 721 | } |
| 722 | } |
| 723 | |
| 724 | return batches |
| 725 | } |
| 726 | |
| 727 | func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) { |
| 728 | timeout := w.writeTimeout() |
no test coverage detected