(self)
| 291 | self.loop.run_until_complete(go()) |
| 292 | |
| 293 | def test_receive_cancelled(self): |
| 294 | req = self.make_request('GET', '/') |
| 295 | ws = WebSocketResponse() |
| 296 | self.loop.run_until_complete(ws.prepare(req)) |
| 297 | |
| 298 | res = asyncio.Future(loop=self.loop) |
| 299 | res.set_exception(asyncio.CancelledError()) |
| 300 | ws._reader.read.return_value = res |
| 301 | |
| 302 | self.assertRaises( |
| 303 | asyncio.CancelledError, |
| 304 | self.loop.run_until_complete, ws.receive()) |
| 305 | |
| 306 | def test_receive_timeouterror(self): |
| 307 | req = self.make_request('GET', '/') |
nothing calls this directly
no test coverage detected