MCPcopy
hub / github.com/aio-libs/aiohttp / test_read_eof

Method test_read_eof

tests/test_streams.py:127–140  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

125 self.assertEqual(b'line2', data)
126
127 def test_read_eof(self):
128 # Read bytes, stop at eof.
129 stream = self._make_one()
130 read_task = asyncio.Task(stream.read(1024), loop=self.loop)
131
132 def cb():
133 stream.feed_eof()
134 self.loop.call_soon(cb)
135
136 data = self.loop.run_until_complete(read_task)
137 self.assertEqual(b'', data)
138
139 data = self.loop.run_until_complete(stream.read())
140 self.assertIs(data, streams.EOF_MARKER)
141
142 @mock.patch('aiohttp.streams.internal_logger')
143 def test_read_eof_infinit(self, internal_logger):

Callers

nothing calls this directly

Calls 2

_make_oneMethod · 0.95
readMethod · 0.45

Tested by

no test coverage detected