MCPcopy
hub / github.com/segmentio/kafka-go / awaitBatch

Method awaitBatch

writer.go:1075–1099  ·  view source on GitHub ↗

awaitBatch waits for a batch to either fill up or time out. If the batch is full it only stops the timer, if the timer expires it will queue the batch for writing if needed.

(batch *writeBatch)

Source from the content-addressed store, hash-verified

1073// If the batch is full it only stops the timer, if the timer
1074// expires it will queue the batch for writing if needed.
1075func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
1076 select {
1077 case <-batch.timer.C:
1078 ptw.mutex.Lock()
1079 // detach the batch from the writer if we're still attached
1080 // and queue for writing.
1081 // Only the current batch can expire, all previous batches were already written to the queue.
1082 // If writeMesseages locks pw.mutex after the timer fires but before this goroutine
1083 // can lock pw.mutex it will either have filled the batch and enqueued it which will mean
1084 // pw.currBatch != batch so we just move on.
1085 // Otherwise, we detach the batch from the ptWriter and enqueue it for writing.
1086 if ptw.currBatch == batch {
1087 ptw.queue.Put(batch)
1088 ptw.currBatch = nil
1089 }
1090 ptw.mutex.Unlock()
1091 case <-batch.ready:
1092 // The batch became full, it was removed from the ptwriter and its
1093 // ready channel was closed. We need to close the timer to avoid
1094 // having it leak until it expires.
1095 batch.timer.Stop()
1096 }
1097 stats := ptw.w.stats()
1098 stats.batchQueueTime.observe(int64(time.Since(batch.time)))
1099}
1100
1101func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
1102 stats := ptw.w.stats()

Callers 1

newWriteBatchMethod · 0.95

Calls 3

PutMethod · 0.80
statsMethod · 0.80
observeMethod · 0.45

Tested by

no test coverage detected