(self, **kwargs: Any)
| 326 | """ |
| 327 | |
| 328 | def initialize(self, **kwargs: Any) -> None: # type: ignore |
| 329 | self.is_current = False |
| 330 | loop = None |
| 331 | if "asyncio_loop" not in kwargs: |
| 332 | kwargs["asyncio_loop"] = loop = asyncio.new_event_loop() |
| 333 | try: |
| 334 | super().initialize(**kwargs) |
| 335 | except Exception: |
| 336 | # If initialize() does not succeed (taking ownership of the loop), |
| 337 | # we have to close it. |
| 338 | if loop is not None: |
| 339 | loop.close() |
| 340 | raise |
| 341 | |
| 342 | def close(self, all_fds: bool = False) -> None: |
| 343 | if self.is_current: |
no test coverage detected