(self)
| 214 | asyncio.StreamReader(limit=-1, loop=self.loop) |
| 215 | |
| 216 | def test_read_limit(self): |
| 217 | stream = asyncio.StreamReader(limit=3, loop=self.loop) |
| 218 | stream.feed_data(b'chunk') |
| 219 | data = self.loop.run_until_complete(stream.read(5)) |
| 220 | self.assertEqual(b'chunk', data) |
| 221 | self.assertEqual(b'', stream._buffer) |
| 222 | |
| 223 | def test_readline(self): |
| 224 | # Read one line. 'readline' will need to wait for the data |
nothing calls this directly
no test coverage detected