MCPcopy
hub / github.com/aio-libs/aiohttp / test_ping_pong

Function test_ping_pong

tests/test_websocket_client_functional.py:56–89  ·  view source on GitHub ↗
(create_app_and_client, loop)

Source from the content-addressed store, hash-verified

54
55@pytest.mark.run_loop
56def test_ping_pong(create_app_and_client, loop):
57
58 closed = asyncio.Future(loop=loop)
59
60 @asyncio.coroutine
61 def handler(request):
62 ws = web.WebSocketResponse()
63 yield from ws.prepare(request)
64
65 msg = yield from ws.receive_bytes()
66 ws.ping()
67 ws.send_bytes(msg+b'/answer')
68 try:
69 yield from ws.close()
70 finally:
71 closed.set_result(1)
72 return ws
73
74 app, client = yield from create_app_and_client()
75 app.router.add_route('GET', '/', handler)
76 resp = yield from client.ws_connect('/')
77
78 resp.ping()
79 resp.send_bytes(b'ask')
80
81 msg = yield from resp.receive()
82 assert msg.tp == aiohttp.MsgType.binary
83 assert msg.data == b'ask/answer'
84
85 msg = yield from resp.receive()
86 assert msg.tp == aiohttp.MsgType.close
87
88 yield from resp.close()
89 yield from closed
90
91
92@pytest.mark.run_loop

Callers

nothing calls this directly

Calls 7

create_app_and_clientFunction · 0.85
add_routeMethod · 0.45
ws_connectMethod · 0.45
pingMethod · 0.45
send_bytesMethod · 0.45
receiveMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected