(self)
| 967 | self._reactorless_loop.run_until_complete(self._reactorless_main_task) |
| 968 | |
| 969 | def _close_loop(self) -> None: |
| 970 | # Similar to asyncio.runners.Runner.close() |
| 971 | loop = self._reactorless_loop |
| 972 | assert loop |
| 973 | try: |
| 974 | self._cancel_all_tasks(loop) |
| 975 | loop.run_until_complete(loop.shutdown_asyncgens()) |
| 976 | loop.run_until_complete(loop.shutdown_default_executor()) |
| 977 | finally: |
| 978 | self._reactorless_main_task = None |
| 979 | asyncio.set_event_loop(None) |
| 980 | loop.close() |
| 981 | self._reactorless_loop = None |
| 982 | |
| 983 | @staticmethod |
| 984 | def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None: |
no test coverage detected