(self)
| 1274 | |
| 1275 | @gen_test |
| 1276 | def test_pipe_iostream(self): |
| 1277 | rs, ws = yield self.make_iostream_pair() |
| 1278 | |
| 1279 | ws.write(b"hel") |
| 1280 | ws.write(b"lo world") |
| 1281 | |
| 1282 | data = yield rs.read_until(b" ") |
| 1283 | self.assertEqual(data, b"hello ") |
| 1284 | |
| 1285 | data = yield rs.read_bytes(3) |
| 1286 | self.assertEqual(data, b"wor") |
| 1287 | |
| 1288 | ws.close() |
| 1289 | |
| 1290 | data = yield rs.read_until_close() |
| 1291 | self.assertEqual(data, b"ld") |
| 1292 | |
| 1293 | rs.close() |
| 1294 | |
| 1295 | @gen_test |
| 1296 | def test_pipe_iostream_big_write(self): |
nothing calls this directly
no test coverage detected