(self, m_req, m_os, WebSocketWriter)
| 213 | @mock.patch('aiohttp.client.os') |
| 214 | @mock.patch('aiohttp.client.ClientSession.get') |
| 215 | def test_close(self, m_req, m_os, WebSocketWriter): |
| 216 | resp = mock.Mock() |
| 217 | resp.status = 101 |
| 218 | resp.headers = { |
| 219 | hdrs.UPGRADE: hdrs.WEBSOCKET, |
| 220 | hdrs.CONNECTION: hdrs.UPGRADE, |
| 221 | hdrs.SEC_WEBSOCKET_ACCEPT: self.ws_key, |
| 222 | } |
| 223 | m_os.urandom.return_value = self.key_data |
| 224 | m_req.return_value = asyncio.Future(loop=self.loop) |
| 225 | m_req.return_value.set_result(resp) |
| 226 | writer = WebSocketWriter.return_value = mock.Mock() |
| 227 | reader = resp.connection.reader.set_parser.return_value = mock.Mock() |
| 228 | |
| 229 | resp = self.loop.run_until_complete( |
| 230 | aiohttp.ws_connect('http://test.org', loop=self.loop)) |
| 231 | self.assertFalse(resp.closed) |
| 232 | |
| 233 | msg = websocket.Message(websocket.MSG_CLOSE, b'', b'') |
| 234 | reader.read.return_value = asyncio.Future(loop=self.loop) |
| 235 | reader.read.return_value.set_result(msg) |
| 236 | |
| 237 | res = self.loop.run_until_complete(resp.close()) |
| 238 | writer.close.assert_called_with(1000, b'') |
| 239 | self.assertTrue(resp.closed) |
| 240 | self.assertTrue(res) |
| 241 | self.assertIsNone(resp.exception()) |
| 242 | |
| 243 | # idempotent |
| 244 | res = self.loop.run_until_complete(resp.close()) |
| 245 | self.assertFalse(res) |
| 246 | self.assertEqual(writer.close.call_count, 1) |
| 247 | |
| 248 | @mock.patch('aiohttp.client.WebSocketWriter') |
| 249 | @mock.patch('aiohttp.client.os') |
nothing calls this directly
no test coverage detected