Mock ASGI application for testing.
| 63 | |
| 64 | |
| 65 | class FakeApp: |
| 66 | """Mock ASGI application for testing.""" |
| 67 | |
| 68 | def __init__(self): |
| 69 | self.calls = [] |
| 70 | |
| 71 | def wsgi(self): |
| 72 | return self.asgi_app |
| 73 | |
| 74 | async def asgi_app(self, scope, receive, send): |
| 75 | self.calls.append(scope) |
| 76 | if scope["type"] == "lifespan": |
| 77 | while True: |
| 78 | message = await receive() |
| 79 | if message["type"] == "lifespan.startup": |
| 80 | await send({"type": "lifespan.startup.complete"}) |
| 81 | elif message["type"] == "lifespan.shutdown": |
| 82 | await send({"type": "lifespan.shutdown.complete"}) |
| 83 | return |
| 84 | elif scope["type"] == "http": |
| 85 | await send({ |
| 86 | "type": "http.response.start", |
| 87 | "status": 200, |
| 88 | "headers": [(b"content-type", b"text/plain")], |
| 89 | }) |
| 90 | await send({ |
| 91 | "type": "http.response.body", |
| 92 | "body": b"Hello from ASGI!", |
| 93 | }) |
| 94 | |
| 95 | |
| 96 | class FakeListener: |
no outgoing calls