MCPcopy
hub / github.com/aio-libs/aiohttp / test_readline

Method test_readline

tests/test_streams.py:184–202  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

182 ValueError, self.loop.run_until_complete, stream.read(2))
183
184 def test_readline(self):
185 # Read one line. 'readline' will need to wait for the data
186 # to come from 'cb'
187 stream = self._make_one()
188 stream.feed_data(b'chunk1 ')
189 read_task = asyncio.Task(stream.readline(), loop=self.loop)
190
191 def cb():
192 stream.feed_data(b'chunk2 ')
193 stream.feed_data(b'chunk3 ')
194 stream.feed_data(b'\n chunk4')
195 self.loop.call_soon(cb)
196
197 line = self.loop.run_until_complete(read_task)
198 self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)
199
200 stream.feed_eof()
201 data = self.loop.run_until_complete(stream.read())
202 self.assertEqual(b' chunk4', data)
203
204 def test_readline_limit_with_existing_data(self):
205 # Read one line. The data is in StreamReader's buffer

Callers

nothing calls this directly

Calls 5

_make_oneMethod · 0.95
feed_dataMethod · 0.45
readlineMethod · 0.45
feed_eofMethod · 0.45
readMethod · 0.45

Tested by

no test coverage detected