(msgs []Message, indexes []int32)
| 1024 | } |
| 1025 | |
| 1026 | func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 { |
| 1027 | ptw.mutex.Lock() |
| 1028 | defer ptw.mutex.Unlock() |
| 1029 | |
| 1030 | batchSize := ptw.w.batchSize() |
| 1031 | batchBytes := ptw.w.batchBytes() |
| 1032 | |
| 1033 | var batches map[*writeBatch][]int32 |
| 1034 | if !ptw.w.Async { |
| 1035 | batches = make(map[*writeBatch][]int32, 1) |
| 1036 | } |
| 1037 | |
| 1038 | for _, i := range indexes { |
| 1039 | assignMessage: |
| 1040 | batch := ptw.currBatch |
| 1041 | if batch == nil { |
| 1042 | batch = ptw.newWriteBatch() |
| 1043 | ptw.currBatch = batch |
| 1044 | } |
| 1045 | if !batch.add(msgs[i], batchSize, batchBytes) { |
| 1046 | batch.trigger() |
| 1047 | ptw.queue.Put(batch) |
| 1048 | ptw.currBatch = nil |
| 1049 | goto assignMessage |
| 1050 | } |
| 1051 | |
| 1052 | if batch.full(batchSize, batchBytes) { |
| 1053 | batch.trigger() |
| 1054 | ptw.queue.Put(batch) |
| 1055 | ptw.currBatch = nil |
| 1056 | } |
| 1057 | |
| 1058 | if !ptw.w.Async { |
| 1059 | batches[batch] = append(batches[batch], i) |
| 1060 | } |
| 1061 | } |
| 1062 | return batches |
| 1063 | } |
| 1064 | |
| 1065 | // ptw.w can be accessed here because this is called with the lock ptw.mutex already held. |
| 1066 | func (ptw *partitionWriter) newWriteBatch() *writeBatch { |
no test coverage detected