(stream, loop, buf)
| 269 | |
| 270 | |
| 271 | def test_chunks_parser(stream, loop, buf): |
| 272 | out = parsers.FlowControlDataQueue(stream, loop=loop) |
| 273 | |
| 274 | p = parsers.ChunksParser(5)(out, buf) |
| 275 | next(p) |
| 276 | for d in (b'line1', b'lin', b'e2d', b'ata'): |
| 277 | p.send(d) |
| 278 | |
| 279 | assert ([(bytearray(b'line1'), 5), (bytearray(b'line2'), 5)] == |
| 280 | list(out._buffer)) |
| 281 | try: |
| 282 | p.throw(parsers.EofStream()) |
| 283 | except StopIteration: |
| 284 | pass |
| 285 | |
| 286 | assert bytes(buf) == b'data' |