MCPcopy
hub / github.com/aio-libs/aiohttp / test_exception_waiter

Method test_exception_waiter

tests/test_streams.py:425–436  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

423 self.assertIs(stream.exception(), exc)
424
425 def test_exception_waiter(self):
426 stream = self._make_one()
427
428 @asyncio.coroutine
429 def set_err():
430 stream.set_exception(ValueError())
431
432 t1 = asyncio.Task(stream.readline(), loop=self.loop)
433 t2 = asyncio.Task(set_err(), loop=self.loop)
434
435 self.loop.run_until_complete(asyncio.wait([t1, t2], loop=self.loop))
436 self.assertRaises(ValueError, t1.result)
437
438 def test_exception_cancel(self):
439 stream = self._make_one()

Callers

nothing calls this directly

Calls 3

_make_oneMethod · 0.95
waitMethod · 0.80
readlineMethod · 0.45

Tested by

no test coverage detected