(self, graph: Graph, sccs: list[SCC] | None = None)
| 1439 | return int(self.size_in_queue * batch_frac) |
| 1440 | |
| 1441 | def submit_to_workers(self, graph: Graph, sccs: list[SCC] | None = None) -> None: |
| 1442 | if sccs is not None: |
| 1443 | for scc in sccs: |
| 1444 | heappush(self.scc_queue, (-scc.size_hint, self.queue_order, scc)) |
| 1445 | self.size_in_queue += scc.size_hint |
| 1446 | self.queue_order += 1 |
| 1447 | max_size_in_batch = self.max_batch_size() |
| 1448 | while self.scc_queue and self.free_workers: |
| 1449 | idx = self.free_workers.pop() |
| 1450 | scc_batch = self.get_scc_batch(max_size_in_batch) |
| 1451 | import_errors = { |
| 1452 | mod_id: self.errors.recorded[path] |
| 1453 | for scc in scc_batch |
| 1454 | for mod_id in scc.mod_ids |
| 1455 | if (path := graph[mod_id].xpath) in self.errors.recorded |
| 1456 | } |
| 1457 | t0 = time.time() |
| 1458 | send( |
| 1459 | self.workers[idx].conn, |
| 1460 | SccRequestMessage( |
| 1461 | scc_ids=[scc.id for scc in scc_batch], |
| 1462 | import_errors=import_errors, |
| 1463 | mod_data={ |
| 1464 | mod_id: ( |
| 1465 | # Although workers don't really need to know about details |
| 1466 | # of dependencies, they will write cache, so we need to pass |
| 1467 | # suppressed_deps_opts() as part of module data. |
| 1468 | graph[mod_id].suppressed_deps_opts(), |
| 1469 | tree.raw_data if (tree := graph[mod_id].tree) else None, |
| 1470 | ) |
| 1471 | for scc in scc_batch |
| 1472 | for mod_id in scc.mod_ids |
| 1473 | }, |
| 1474 | ), |
| 1475 | ) |
| 1476 | self.add_stats(scc_requests_sent=1, scc_send_time=time.time() - t0) |
| 1477 | |
| 1478 | def wait_for_done(self, graph: Graph) -> tuple[list[SCC], bool, dict[str, ModuleResult]]: |
| 1479 | """Wait for a stale SCC processing to finish. |
no test coverage detected