MCPcopy
hub / github.com/aio-libs/aiohttp / test_read_until_eof

Method test_read_until_eof

tests/test_streams.py:156–171  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

154 self.assertTrue(internal_logger.warning.called)
155
156 def test_read_until_eof(self):
157 # Read all bytes until eof.
158 stream = self._make_one()
159 read_task = asyncio.Task(stream.read(-1), loop=self.loop)
160
161 def cb():
162 stream.feed_data(b'chunk1\n')
163 stream.feed_data(b'chunk2')
164 stream.feed_eof()
165 self.loop.call_soon(cb)
166
167 data = self.loop.run_until_complete(read_task)
168 self.assertEqual(b'chunk1\nchunk2', data)
169
170 data = self.loop.run_until_complete(stream.read())
171 self.assertEqual(b'', data)
172
173 def test_read_exception(self):
174 stream = self._make_one()

Callers

nothing calls this directly

Calls 2

_make_oneMethod · 0.95
readMethod · 0.45

Tested by

no test coverage detected