MCPcopy Index your code
hub / github.com/python/cpython / test_readline

Method test_readline

Lib/test/test_asyncio/test_streams.py:223–238  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

221 self.assertEqual(b'', stream._buffer)
222
223 def test_readline(self):
224 # Read one line. 'readline' will need to wait for the data
225 # to come from 'cb'
226 stream = asyncio.StreamReader(loop=self.loop)
227 stream.feed_data(b'chunk1 ')
228 read_task = self.loop.create_task(stream.readline())
229
230 def cb():
231 stream.feed_data(b'chunk2 ')
232 stream.feed_data(b'chunk3 ')
233 stream.feed_data(b'\n chunk4')
234 self.loop.call_soon(cb)
235
236 line = self.loop.run_until_complete(read_task)
237 self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
238 self.assertEqual(b' chunk4', stream._buffer)
239
240 def test_readline_limit_with_existing_data(self):
241 # Read one line. The data is in StreamReader's buffer

Callers

nothing calls this directly

Calls 6

feed_dataMethod · 0.95
readlineMethod · 0.95
create_taskMethod · 0.45
call_soonMethod · 0.45
run_until_completeMethod · 0.45
assertEqualMethod · 0.45

Tested by

no test coverage detected