(test_client_factory: TestClientFactory)
| 26 | |
| 27 | |
| 28 | def test_sync_task(test_client_factory: TestClientFactory) -> None: |
| 29 | TASK_COMPLETE = False |
| 30 | |
| 31 | def sync_task() -> None: |
| 32 | nonlocal TASK_COMPLETE |
| 33 | TASK_COMPLETE = True |
| 34 | |
| 35 | task = BackgroundTask(sync_task) |
| 36 | |
| 37 | async def app(scope: Scope, receive: Receive, send: Send) -> None: |
| 38 | response = Response("task initiated", media_type="text/plain", background=task) |
| 39 | await response(scope, receive, send) |
| 40 | |
| 41 | client = test_client_factory(app) |
| 42 | response = client.get("/") |
| 43 | assert response.text == "task initiated" |
| 44 | assert TASK_COMPLETE |
| 45 | |
| 46 | |
| 47 | def test_multiple_tasks(test_client_factory: TestClientFactory) -> None: |
nothing calls this directly
no test coverage detected