(self)
| 113 | self.extra_headers = None |
| 114 | |
| 115 | def __enter__(self) -> Self: |
| 116 | with contextlib.ExitStack() as stack: |
| 117 | self.portal = portal = stack.enter_context(self.portal_factory()) |
| 118 | fut, cs = portal.start_task(self._run) |
| 119 | stack.callback(fut.result) |
| 120 | stack.callback(portal.call, cs.cancel) |
| 121 | self.send({class="st">"type": class="st">"websocket.connect"}) |
| 122 | message = self.receive() |
| 123 | self._raise_on_close(message) |
| 124 | self.accepted_subprotocol = message.get(class="st">"subprotocol", None) |
| 125 | self.extra_headers = message.get(class="st">"headers", None) |
| 126 | stack.callback(self.close, 1000) |
| 127 | self.exit_stack = stack.pop_all() |
| 128 | return self |
| 129 | |
| 130 | def __exit__(self, *args: Any) -> bool | None: |
| 131 | return self.exit_stack.__exit__(*args) |
nothing calls this directly
no test coverage detected