(self)
| 113 | self.extra_headers = None |
| 114 | |
| 115 | def __enter__(self) -> Self: |
| 116 | with contextlib.ExitStack() as stack: |
| 117 | self.portal = portal = stack.enter_context(self.portal_factory()) |
| 118 | fut, cs = portal.start_task(self._run) |
| 119 | stack.callback(fut.result) |
| 120 | stack.callback(portal.call, cs.cancel) |
| 121 | self.send({"type": "websocket.connect"}) |
| 122 | message = self.receive() |
| 123 | self._raise_on_close(message) |
| 124 | self.accepted_subprotocol = message.get("subprotocol", None) |
| 125 | self.extra_headers = message.get("headers", None) |
| 126 | stack.callback(self.close, 1000) |
| 127 | self.exit_stack = stack.pop_all() |
| 128 | return self |
| 129 | |
| 130 | def __exit__(self, *args: Any) -> bool | None: |
| 131 | return self.exit_stack.__exit__(*args) |
nothing calls this directly
no test coverage detected