MCPcopy
hub / github.com/segmentio/kafka-go / batchMessages

Method batchMessages

writer.go:698–725  ·  view source on GitHub ↗
(messages []Message, assignments map[topicPartition][]int32)

Source from the content-addressed store, hash-verified

696}
697
698func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
699 var batches map[*writeBatch][]int32
700 if !w.Async {
701 batches = make(map[*writeBatch][]int32, len(assignments))
702 }
703
704 w.mutex.Lock()
705 defer w.mutex.Unlock()
706
707 if w.writers == nil {
708 w.writers = map[topicPartition]*partitionWriter{}
709 }
710
711 for key, indexes := range assignments {
712 writer := w.writers[key]
713 if writer == nil {
714 writer = newPartitionWriter(w, key)
715 w.writers[key] = writer
716 }
717 wbatches := writer.writeMessages(messages, indexes)
718
719 for batch, idxs := range wbatches {
720 batches[batch] = idxs
721 }
722 }
723
724 return batches
725}
726
727func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
728 timeout := w.writeTimeout()

Callers 1

WriteMessagesMethod · 0.95

Calls 2

newPartitionWriterFunction · 0.85
writeMessagesMethod · 0.80

Tested by

no test coverage detected