MCPcopy
hub / github.com/aio-libs/aiohttp / test_close_manual

Function test_close_manual

tests/test_websocket_client_functional.py:197–231  ·  view source on GitHub ↗
(create_app_and_client, loop)

Source from the content-addressed store, hash-verified

195
196@pytest.mark.run_loop
197def test_close_manual(create_app_and_client, loop):
198
199 closed = asyncio.Future(loop=loop)
200
201 @asyncio.coroutine
202 def handler(request):
203 ws = web.WebSocketResponse()
204 yield from ws.prepare(request)
205
206 yield from ws.receive_bytes()
207 ws.send_str('test')
208
209 try:
210 yield from ws.close()
211 finally:
212 closed.set_result(1)
213 return ws
214
215 app, client = yield from create_app_and_client()
216 app.router.add_route('GET', '/', handler)
217 resp = yield from client.ws_connect('/', autoclose=False)
218 resp.send_bytes(b'ask')
219
220 msg = yield from resp.receive()
221 assert msg.data == 'test'
222
223 msg = yield from resp.receive()
224 assert msg.tp == aiohttp.MsgType.close
225 assert msg.data == 1000
226 assert msg.extra == ''
227 assert not resp.closed
228
229 yield from resp.close()
230 yield from closed
231 assert resp.closed
232
233
234@pytest.mark.run_loop

Callers

nothing calls this directly

Calls 6

create_app_and_clientFunction · 0.85
add_routeMethod · 0.45
ws_connectMethod · 0.45
send_bytesMethod · 0.45
receiveMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected