Get a batch of SCCs from queue to submit to a worker. We batch SCCs to avoid communication overhead, but to avoid long poles, we limit fraction of work per worker.
(self, max_size_in_batch: int)
| 1409 | self.scc_queue.extend([(0, 0, scc) for scc in sccs]) |
| 1410 | |
| 1411 | def get_scc_batch(self, max_size_in_batch: int) -> list[SCC]: |
| 1412 | """Get a batch of SCCs from queue to submit to a worker. |
| 1413 | |
| 1414 | We batch SCCs to avoid communication overhead, but to avoid |
| 1415 | long poles, we limit fraction of work per worker. |
| 1416 | """ |
| 1417 | batch: list[SCC] = [] |
| 1418 | size_in_batch = 0 |
| 1419 | while self.scc_queue and ( |
| 1420 | # Three notes keep in mind here: |
| 1421 | # * Heap key is *negative* size (so that larger SCCs appear first). |
| 1422 | # * Each batch must have at least one item. |
| 1423 | # * Adding another SCC to batch should not exceed maximum allowed size. |
| 1424 | size_in_batch - self.scc_queue[0][0] <= max_size_in_batch |
| 1425 | or not batch |
| 1426 | ): |
| 1427 | size_key, _, scc = heappop(self.scc_queue) |
| 1428 | size_in_batch -= size_key |
| 1429 | self.size_in_queue += size_key |
| 1430 | batch.append(scc) |
| 1431 | return batch |
| 1432 | |
| 1433 | def max_batch_size(self) -> int: |
| 1434 | batch_frac = 1 / len(self.workers) |
no test coverage detected