Run sync function in greenlet. Support nested calls
(
self, fn: Callable[..., _T], *args: Any, **kwargs: Any
)
| 303 | return self.runner.run(fn(*args, **kwargs)) |
| 304 | |
| 305 | def run_in_greenlet( |
| 306 | self, fn: Callable[..., _T], *args: Any, **kwargs: Any |
| 307 | ) -> _T: |
| 308 | """Run sync function in greenlet. Support nested calls""" |
| 309 | _concurrency_shim._initialize(raise_=False) |
| 310 | |
| 311 | if _concurrency_shim._has_greenlet: |
| 312 | if self.runner.get_loop().is_running(): |
| 313 | # allow for a wrapped test function to call another |
| 314 | assert in_greenlet() |
| 315 | return fn(*args, **kwargs) |
| 316 | else: |
| 317 | return self.runner.run(greenlet_spawn(fn, *args, **kwargs)) |
| 318 | else: |
| 319 | return fn(*args, **kwargs) |
| 320 | |
| 321 | def close(self) -> None: |
| 322 | self.runner.close() |
no test coverage detected