(self)
| 123 | self._start() |
| 124 | |
| 125 | def _start(self) -> None: |
| 126 | if sys.platform == "linux": |
| 127 | ctx = get_context("forkserver") |
| 128 | else: |
| 129 | ctx = get_context("spawn") |
| 130 | self.tasks: Queue[str] = ctx.Queue() |
| 131 | self.results: Queue[ModuleProperties | str] = ctx.Queue() |
| 132 | self.proc = ctx.Process(target=worker, args=(self.tasks, self.results, sys.path)) |
| 133 | self.proc.start() |
| 134 | self.counter = 0 # Number of successful roundtrips |
| 135 | |
| 136 | def close(self) -> None: |
| 137 | """Free any resources used.""" |
no outgoing calls
no test coverage detected