MCPcopy
hub / github.com/aio-libs/aiohttp / test_chunks_parser

Function test_chunks_parser

tests/test_parser_buffer.py:271–286  ·  view source on GitHub ↗
(stream, loop, buf)

Source from the content-addressed store, hash-verified

269
270
271def test_chunks_parser(stream, loop, buf):
272 out = parsers.FlowControlDataQueue(stream, loop=loop)
273
274 p = parsers.ChunksParser(5)(out, buf)
275 next(p)
276 for d in (b'line1', b'lin', b'e2d', b'ata'):
277 p.send(d)
278
279 assert ([(bytearray(b'line1'), 5), (bytearray(b'line2'), 5)] ==
280 list(out._buffer))
281 try:
282 p.throw(parsers.EofStream())
283 except StopIteration:
284 pass
285
286 assert bytes(buf) == b'data'

Callers

nothing calls this directly

Calls 2

throwMethod · 0.80
sendMethod · 0.45

Tested by

no test coverage detected