(self)
| 1013 | loop.call_soon_threadsafe(self._create_shutdown_task) |
| 1014 | |
| 1015 | def _create_shutdown_task(self) -> None: |
| 1016 | assert self._reactorless_loop |
| 1017 | coro = self._shutdown_graceful_reactorless() |
| 1018 | try: |
| 1019 | self._reactorless_loop.create_task(coro) |
| 1020 | except RuntimeError: |
| 1021 | coro.close() |
| 1022 | |
| 1023 | async def _shutdown_graceful_reactorless(self) -> None: |
| 1024 | await self.stop() |
nothing calls this directly
no test coverage detected