| 182 | } |
| 183 | |
| 184 | private void rollingWindowLoop() { |
| 185 | while (true) { |
| 186 | List<CompletableFuture<Void>> toComplete = new ArrayList<>(); |
| 187 | long sleepMs = 0; |
| 188 | |
| 189 | lock.lock(); |
| 190 | try { |
| 191 | if (queue.isEmpty()) { |
| 192 | running = false; |
| 193 | return; |
| 194 | } |
| 195 | |
| 196 | long now = milliseconds(); |
| 197 | |
| 198 | // Remove expired timestamps |
| 199 | long cutoff = now - (long) windowSize; |
| 200 | timestamps.removeIf(t -> t.timestamp <= cutoff); |
| 201 | double totalCost = timestamps.stream().mapToDouble(t -> t.cost).sum(); |
| 202 | |
| 203 | // Batch: complete all requests that fit within the window weight |
| 204 | while (!queue.isEmpty()) { |
| 205 | QueueElement first = queue.peek(); |
| 206 | if (totalCost + first.cost <= maxWeight) { |
| 207 | queue.poll(); |
| 208 | timestamps.add(new TimestampedCost(now, first.cost)); |
| 209 | totalCost += first.cost; |
| 210 | toComplete.add(first.future); |
| 211 | } else { |
| 212 | break; |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | // If queue still has items, compute exact wait until oldest entry expires |
| 217 | if (!queue.isEmpty()) { |
| 218 | if (!timestamps.isEmpty()) { |
| 219 | sleepMs = (timestamps.get(0).timestamp + (long) windowSize) - now; |
| 220 | if (sleepMs < 1) sleepMs = 1; |
| 221 | } else if (toComplete.isEmpty()) { |
| 222 | // Inner loop made no progress AND no history to wait on — prevents |
| 223 | // 100% CPU spin when head cost exceeds maxWeight (e.g. maxWeight=0). |
| 224 | sleepMs = Math.max(1, (long) (delay * 1000)); |
| 225 | } |
| 226 | } |
| 227 | } finally { |
| 228 | lock.unlock(); |
| 229 | } |
| 230 | |
| 231 | // Complete futures outside the lock |
| 232 | for (var f : toComplete) { |
| 233 | f.complete(null); |
| 234 | } |
| 235 | |
| 236 | if (sleepMs > 0) { |
| 237 | try { |
| 238 | Thread.sleep(sleepMs); |
| 239 | } catch (InterruptedException e) { |
| 240 | Thread.currentThread().interrupt(); |
| 241 | return; |