Stop the dirty arbiter.
(self)
| 256 | time.sleep(0.5) |
| 257 | |
| 258 | def stop(self): |
| 259 | """Stop the dirty arbiter.""" |
| 260 | if self.arbiter_pid: |
| 261 | try: |
| 262 | os.kill(self.arbiter_pid, signal.SIGTERM) |
| 263 | os.waitpid(self.arbiter_pid, 0) |
| 264 | except (OSError, ChildProcessError): |
| 265 | pass |
| 266 | self.arbiter_pid = None |
| 267 | |
| 268 | # Cleanup temp directory |
| 269 | if self._tmpdir: |
| 270 | try: |
| 271 | for f in os.listdir(self._tmpdir): |
| 272 | os.unlink(os.path.join(self._tmpdir, f)) |
| 273 | os.rmdir(self._tmpdir) |
| 274 | except OSError: |
| 275 | pass |
| 276 | self._tmpdir = None |
| 277 | |
| 278 | def warmup(self, requests: int = 10): |
| 279 | """Warm up the pool with a few requests.""" |
no test coverage detected