Wrap a generator-based lifespan context into an async context manager. This is vendored from Starlette to avoid importing private symbols.
(
lifespan_context: Callable[[Any], Generator[Any, Any, Any]],
)
| 206 | |
| 207 | # Vendored from starlette.routing to avoid importing private symbols |
| 208 | def _wrap_gen_lifespan_context( |
| 209 | lifespan_context: Callable[[Any], Generator[Any, Any, Any]], |
| 210 | ) -> Callable[[Any], AbstractAsyncContextManager[Any]]: |
| 211 | """ |
| 212 | Wrap a generator-based lifespan context into an async context manager. |
| 213 | |
| 214 | This is vendored from Starlette to avoid importing private symbols. |
| 215 | """ |
| 216 | cmgr = contextlib.contextmanager(lifespan_context) |
| 217 | |
| 218 | @functools.wraps(cmgr) |
| 219 | def wrapper(app: Any) -> _AsyncLiftContextManager[Any]: |
| 220 | return _AsyncLiftContextManager(cmgr(app)) |
| 221 | |
| 222 | return wrapper |
| 223 | |
| 224 | |
| 225 | def _merge_lifespan_context( |
no outgoing calls
no test coverage detected
searching dependent graphs…