MCPcopy
hub / github.com/aio-libs/aiohttp / test_close

Method test_close

tests/test_websocket_client.py:215–246  ·  view source on GitHub ↗
(self, m_req, m_os, WebSocketWriter)

Source from the content-addressed store, hash-verified

213 @mock.patch('aiohttp.client.os')
214 @mock.patch('aiohttp.client.ClientSession.get')
215 def test_close(self, m_req, m_os, WebSocketWriter):
216 resp = mock.Mock()
217 resp.status = 101
218 resp.headers = {
219 hdrs.UPGRADE: hdrs.WEBSOCKET,
220 hdrs.CONNECTION: hdrs.UPGRADE,
221 hdrs.SEC_WEBSOCKET_ACCEPT: self.ws_key,
222 }
223 m_os.urandom.return_value = self.key_data
224 m_req.return_value = asyncio.Future(loop=self.loop)
225 m_req.return_value.set_result(resp)
226 writer = WebSocketWriter.return_value = mock.Mock()
227 reader = resp.connection.reader.set_parser.return_value = mock.Mock()
228
229 resp = self.loop.run_until_complete(
230 aiohttp.ws_connect('http://test.org', loop=self.loop))
231 self.assertFalse(resp.closed)
232
233 msg = websocket.Message(websocket.MSG_CLOSE, b'', b'')
234 reader.read.return_value = asyncio.Future(loop=self.loop)
235 reader.read.return_value.set_result(msg)
236
237 res = self.loop.run_until_complete(resp.close())
238 writer.close.assert_called_with(1000, b'')
239 self.assertTrue(resp.closed)
240 self.assertTrue(res)
241 self.assertIsNone(resp.exception())
242
243 # idempotent
244 res = self.loop.run_until_complete(resp.close())
245 self.assertFalse(res)
246 self.assertEqual(writer.close.call_count, 1)
247
248 @mock.patch('aiohttp.client.WebSocketWriter')
249 @mock.patch('aiohttp.client.os')

Callers

nothing calls this directly

Calls 3

ws_connectMethod · 0.45
closeMethod · 0.45
exceptionMethod · 0.45

Tested by

no test coverage detected