MCPcopy
hub / github.com/segmentio/kafka-go / writeMessages

Method writeMessages

writer.go:1026–1063  ·  view source on GitHub ↗
(msgs []Message, indexes []int32)

Source from the content-addressed store, hash-verified

1024}
1025
1026func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
1027 ptw.mutex.Lock()
1028 defer ptw.mutex.Unlock()
1029
1030 batchSize := ptw.w.batchSize()
1031 batchBytes := ptw.w.batchBytes()
1032
1033 var batches map[*writeBatch][]int32
1034 if !ptw.w.Async {
1035 batches = make(map[*writeBatch][]int32, 1)
1036 }
1037
1038 for _, i := range indexes {
1039 assignMessage:
1040 batch := ptw.currBatch
1041 if batch == nil {
1042 batch = ptw.newWriteBatch()
1043 ptw.currBatch = batch
1044 }
1045 if !batch.add(msgs[i], batchSize, batchBytes) {
1046 batch.trigger()
1047 ptw.queue.Put(batch)
1048 ptw.currBatch = nil
1049 goto assignMessage
1050 }
1051
1052 if batch.full(batchSize, batchBytes) {
1053 batch.trigger()
1054 ptw.queue.Put(batch)
1055 ptw.currBatch = nil
1056 }
1057
1058 if !ptw.w.Async {
1059 batches[batch] = append(batches[batch], i)
1060 }
1061 }
1062 return batches
1063}
1064
1065// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
1066func (ptw *partitionWriter) newWriteBatch() *writeBatch {

Callers 1

batchMessagesMethod · 0.80

Calls 7

newWriteBatchMethod · 0.95
batchSizeMethod · 0.80
batchBytesMethod · 0.80
addMethod · 0.80
PutMethod · 0.80
triggerMethod · 0.45
fullMethod · 0.45

Tested by

no test coverage detected