Main server loop of the worker. Receive initial state from the coordinator, then process each SCC checking request and reply to client (coordinator). See module docstring for more details on the protocol.
(server: IPCServer, ctx: ServerContext)
| 148 | |
| 149 | |
| 150 | def serve(server: IPCServer, ctx: ServerContext) -> None: |
| 151 | """Main server loop of the worker. |
| 152 | |
| 153 | Receive initial state from the coordinator, then process each |
| 154 | SCC checking request and reply to client (coordinator). See module |
| 155 | docstring for more details on the protocol. |
| 156 | """ |
| 157 | buf = receive(server) |
| 158 | if should_shutdown(buf, SOURCES_DATA_MESSAGE): |
| 159 | return |
| 160 | sources = SourcesDataMessage.read(buf).sources |
| 161 | manager = setup_worker_manager(sources, ctx) |
| 162 | if manager is None: |
| 163 | return |
| 164 | |
| 165 | # Notify coordinator we are done with setup. |
| 166 | send(server, AckMessage()) |
| 167 | buf = receive(server) |
| 168 | if should_shutdown(buf, GRAPH_MESSAGE): |
| 169 | return |
| 170 | |
| 171 | # Disable GC before loading graph and SCC structure, these create a bunch |
| 172 | # of small objects that will stay around until the end of the build. |
| 173 | if platform.python_implementation() == "CPython": |
| 174 | gc.disable() |
| 175 | |
| 176 | graph_data = GraphMessage.read(buf, manager) |
| 177 | # Update some manager data in-place as it has been passed to semantic analyzer. |
| 178 | manager.missing_modules |= graph_data.missing_modules |
| 179 | graph = graph_data.graph |
| 180 | for id in graph: |
| 181 | manager.import_map[id] = graph[id].dependencies_set |
| 182 | # Link modules dicts, so that plugins will get access to ASTs as we parse them. |
| 183 | manager.plugin.set_modules(manager.modules) |
| 184 | |
| 185 | # Notify coordinator we are ready to receive computed graph SCC structure. |
| 186 | send(server, AckMessage()) |
| 187 | buf = receive(server) |
| 188 | if should_shutdown(buf, SCCS_DATA_MESSAGE): |
| 189 | return |
| 190 | sccs = SccsDataMessage.read(buf).sccs |
| 191 | manager.scc_by_id = {scc.id: scc for scc in sccs} |
| 192 | manager.top_order = [scc.id for scc in sccs] |
| 193 | |
| 194 | if platform.python_implementation() == "CPython": |
| 195 | gc.freeze() |
| 196 | gc.enable() |
| 197 | |
| 198 | # Notify coordinator we are ready to start processing SCCs. |
| 199 | send(server, AckMessage()) |
| 200 | while True: |
| 201 | t0 = time.time() |
| 202 | ready_to_read([server], WORKER_IDLE_TIMEOUT) |
| 203 | t1 = time.time() |
| 204 | buf = receive(server) |
| 205 | assert read_tag(buf) == SCC_REQUEST_MESSAGE |
| 206 | scc_message = SccRequestMessage.read(buf) |
| 207 | manager.add_stats(scc_wait_time=t1 - t0, scc_receive_time=time.time() - t1) |
no test coverage detected
searching dependent graphs…