MCPcopy
hub / github.com/encode/starlette / cancel_on_disconnect

Function cancel_on_disconnect

tests/middleware/test_base.py:512–521  ·  view source on GitHub ↗
(
                *,
                task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
            )

Source from the content-addressed store, hash-verified

510 async with anyio.create_task_group() as task_group:
511
512 async def cancel_on_disconnect(
513 *,
514 task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
515 ) -> None:
516 task_status.started()
517 while True:
518 message = await receive()
519 if message["type"] == "http.disconnect": # pragma: no branch
520 task_group.cancel_scope.cancel()
521 break
522
523 # Using start instead of start_soon to ensure that
524 # cancel_on_disconnect is scheduled by the event loop

Callers

nothing calls this directly

Calls 1

receiveFunction · 0.70

Tested by

no test coverage detected