MCPcopy
hub / github.com/aio-libs/aiohttp / test_wait_eof

Method test_wait_eof

tests/test_streams.py:50–61  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

48 self.assertTrue(stream.at_eof())
49
50 def test_wait_eof(self):
51 stream = self._make_one()
52 wait_task = asyncio.Task(stream.wait_eof(), loop=self.loop)
53
54 def cb():
55 yield from asyncio.sleep(0.1, loop=self.loop)
56 stream.feed_eof()
57
58 asyncio.Task(cb(), loop=self.loop)
59 self.loop.run_until_complete(wait_task)
60 self.assertTrue(stream.is_eof())
61 self.assertIsNone(stream._eof_waiter)
62
63 def test_wait_eof_eof(self):
64 stream = self._make_one()

Callers

nothing calls this directly

Calls 4

_make_oneMethod · 0.95
cbFunction · 0.70
wait_eofMethod · 0.45
is_eofMethod · 0.45

Tested by

no test coverage detected