| 494 | return PlainTextResponse("Custom") |
| 495 | |
| 496 | async def downstream_app( |
| 497 | scope: Scope, |
| 498 | receive: Receive, |
| 499 | send: Send, |
| 500 | ) -> None: |
| 501 | await send( |
| 502 | { |
| 503 | "type": "http.response.start", |
| 504 | "status": 200, |
| 505 | "headers": [ |
| 506 | (b"content-type", b"text/plain"), |
| 507 | ], |
| 508 | } |
| 509 | ) |
| 510 | async with anyio.create_task_group() as task_group: |
| 511 | |
| 512 | async def cancel_on_disconnect( |
| 513 | *, |
| 514 | task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED, |
| 515 | ) -> None: |
| 516 | task_status.started() |
| 517 | while True: |
| 518 | message = await receive() |
| 519 | if message["type"] == "http.disconnect": # pragma: no branch |
| 520 | task_group.cancel_scope.cancel() |
| 521 | break |
| 522 | |
| 523 | # Using start instead of start_soon to ensure that |
| 524 | # cancel_on_disconnect is scheduled by the event loop |
| 525 | # before we start returning the body |
| 526 | await task_group.start(cancel_on_disconnect) |
| 527 | |
| 528 | # A timeout is set for 0.1 second in order to ensure that |
| 529 | # we never deadlock the test run in an infinite loop |
| 530 | with anyio.move_on_after(0.1): |
| 531 | while True: |
| 532 | await send( |
| 533 | { |
| 534 | "type": "http.response.body", |
| 535 | "body": b"chunk ", |
| 536 | "more_body": True, |
| 537 | } |
| 538 | ) |
| 539 | |
| 540 | pytest.fail("http.disconnect should have been received and canceled the scope") # pragma: no cover |
| 541 | |
| 542 | app = DiscardingMiddleware(downstream_app) |
| 543 | |