awaitBatch waits for a batch to either fill up or time out. If the batch is full it only stops the timer, if the timer expires it will queue the batch for writing if needed.
(batch *writeBatch)
| 1073 | // If the batch is full it only stops the timer, if the timer |
| 1074 | // expires it will queue the batch for writing if needed. |
| 1075 | func (ptw *partitionWriter) awaitBatch(batch *writeBatch) { |
| 1076 | select { |
| 1077 | case <-batch.timer.C: |
| 1078 | ptw.mutex.Lock() |
| 1079 | // detach the batch from the writer if we're still attached |
| 1080 | // and queue for writing. |
| 1081 | // Only the current batch can expire, all previous batches were already written to the queue. |
| 1082 | // If writeMesseages locks pw.mutex after the timer fires but before this goroutine |
| 1083 | // can lock pw.mutex it will either have filled the batch and enqueued it which will mean |
| 1084 | // pw.currBatch != batch so we just move on. |
| 1085 | // Otherwise, we detach the batch from the ptWriter and enqueue it for writing. |
| 1086 | if ptw.currBatch == batch { |
| 1087 | ptw.queue.Put(batch) |
| 1088 | ptw.currBatch = nil |
| 1089 | } |
| 1090 | ptw.mutex.Unlock() |
| 1091 | case <-batch.ready: |
| 1092 | // The batch became full, it was removed from the ptwriter and its |
| 1093 | // ready channel was closed. We need to close the timer to avoid |
| 1094 | // having it leak until it expires. |
| 1095 | batch.timer.Stop() |
| 1096 | } |
| 1097 | stats := ptw.w.stats() |
| 1098 | stats.batchQueueTime.observe(int64(time.Since(batch.time))) |
| 1099 | } |
| 1100 | |
| 1101 | func (ptw *partitionWriter) writeBatch(batch *writeBatch) { |
| 1102 | stats := ptw.w.stats() |
no test coverage detected