MCPcopy
hub / github.com/aio-libs/aiohttp / test_lines_parser

Function test_lines_parser

tests/test_parser_buffer.py:253–268  ·  view source on GitHub ↗
(buf, stream, loop)

Source from the content-addressed store, hash-verified

251
252
253def test_lines_parser(buf, stream, loop):
254 out = parsers.FlowControlDataQueue(stream, loop=loop)
255
256 p = parsers.LinesParser()(out, buf)
257 next(p)
258 for d in (b'line1', b'\r\n', b'lin', b'e2\r', b'\ndata'):
259 p.send(d)
260
261 assert ([(bytearray(b'line1\r\n'), 7), (bytearray(b'line2\r\n'), 7)] ==
262 list(out._buffer))
263 try:
264 p.throw(parsers.EofStream())
265 except StopIteration:
266 pass
267
268 assert bytes(buf) == b'data'
269
270
271def test_chunks_parser(stream, loop, buf):

Callers

nothing calls this directly

Calls 2

throwMethod · 0.80
sendMethod · 0.45

Tested by

no test coverage detected