()
| 98 | |
| 99 | @asyncio.coroutine |
| 100 | def go(): |
| 101 | _, _, url = yield from self.create_server('GET', '/', handler) |
| 102 | resp, reader, writer = yield from self.connect_ws(url) |
| 103 | writer.send('ask') |
| 104 | msg = yield from reader.read() |
| 105 | self.assertEqual(msg.tp, websocket.MSG_TEXT) |
| 106 | self.assertEqual('ask/answer', msg.data) |
| 107 | |
| 108 | msg = yield from reader.read() |
| 109 | self.assertEqual(msg.tp, websocket.MSG_CLOSE) |
| 110 | self.assertEqual(msg.data, 1000) |
| 111 | self.assertEqual(msg.extra, '') |
| 112 | |
| 113 | writer.close() |
| 114 | |
| 115 | yield from closed |
| 116 | resp.close() |
| 117 | |
| 118 | self.loop.run_until_complete(go()) |
| 119 |
nothing calls this directly
no test coverage detected