(self)
| 167 | self.assertEqual(b'line2', stream._buffer) |
| 168 | |
| 169 | def test_read_eof(self): |
| 170 | # Read bytes, stop at eof. |
| 171 | stream = asyncio.StreamReader(loop=self.loop) |
| 172 | read_task = self.loop.create_task(stream.read(1024)) |
| 173 | |
| 174 | def cb(): |
| 175 | stream.feed_eof() |
| 176 | self.loop.call_soon(cb) |
| 177 | |
| 178 | data = self.loop.run_until_complete(read_task) |
| 179 | self.assertEqual(b'', data) |
| 180 | self.assertEqual(b'', stream._buffer) |
| 181 | |
| 182 | def test_read_until_eof(self): |
| 183 | # Read all bytes until eof. |
nothing calls this directly
no test coverage detected