| 175 | } |
| 176 | |
| 177 | func (q *RequestQueue) getBatchBuffer(batchBuffer []Request, userID string, queue chan Request) []Request { |
| 178 | requestedCount := len(batchBuffer) |
| 179 | guaranteedInQueue := min(len(queue), requestedCount) |
| 180 | |
| 181 | totalWeight := 0 |
| 182 | actuallyInBatch := 0 |
| 183 | |
| 184 | for i := range guaranteedInQueue { |
| 185 | batchBuffer[i] = <-queue |
| 186 | actuallyInBatch++ |
| 187 | totalWeight += batchBuffer[i].Weight() |
| 188 | |
| 189 | if totalWeight >= requestedCount { |
| 190 | break |
| 191 | } |
| 192 | } |
| 193 | batchBuffer = batchBuffer[:actuallyInBatch] |
| 194 | |
| 195 | q.queueLength.WithLabelValues(userID).Set(float64(len(queue))) |
| 196 | q.batchWeight.WithLabelValues(userID).Observe(float64(totalWeight)) |
| 197 | |
| 198 | return batchBuffer |
| 199 | } |
| 200 | |
| 201 | func (q *RequestQueue) cleanupQueues(_ context.Context) error { |
| 202 | q.mtx.Lock() |