(self)
| 221 | self.assertEqual(b'', stream._buffer) |
| 222 | |
| 223 | def test_readline(self): |
| 224 | # Read one line. 'readline' will need to wait for the data |
| 225 | # to come from 'cb' |
| 226 | stream = asyncio.StreamReader(loop=self.loop) |
| 227 | stream.feed_data(b'chunk1 ') |
| 228 | read_task = self.loop.create_task(stream.readline()) |
| 229 | |
| 230 | def cb(): |
| 231 | stream.feed_data(b'chunk2 ') |
| 232 | stream.feed_data(b'chunk3 ') |
| 233 | stream.feed_data(b'\n chunk4') |
| 234 | self.loop.call_soon(cb) |
| 235 | |
| 236 | line = self.loop.run_until_complete(read_task) |
| 237 | self.assertEqual(b'chunk1 chunk2 chunk3 \n', line) |
| 238 | self.assertEqual(b' chunk4', stream._buffer) |
| 239 | |
| 240 | def test_readline_limit_with_existing_data(self): |
| 241 | # Read one line. The data is in StreamReader's buffer |
nothing calls this directly
no test coverage detected