MCPcopy Index your code
hub / github.com/python/cpython / test_read_until_eof

Method test_read_until_eof

Lib/test/test_asyncio/test_streams.py:182–196  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

180 self.assertEqual(b'', stream._buffer)
181
182 def test_read_until_eof(self):
183 # Read all bytes until eof.
184 stream = asyncio.StreamReader(loop=self.loop)
185 read_task = self.loop.create_task(stream.read(-1))
186
187 def cb():
188 stream.feed_data(b'chunk1\n')
189 stream.feed_data(b'chunk2')
190 stream.feed_eof()
191 self.loop.call_soon(cb)
192
193 data = self.loop.run_until_complete(read_task)
194
195 self.assertEqual(b'chunk1\nchunk2', data)
196 self.assertEqual(b'', stream._buffer)
197
198 def test_read_exception(self):
199 stream = asyncio.StreamReader(loop=self.loop)

Callers

nothing calls this directly

Calls 5

readMethod · 0.95
create_taskMethod · 0.45
call_soonMethod · 0.45
run_until_completeMethod · 0.45
assertEqualMethod · 0.45

Tested by

no test coverage detected