MCPcopy
hub / github.com/django/django / test_limited_stream

Method test_limited_stream

tests/requests_tests/tests.py:263–318  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

261 self.assertIs(request_copy.environ, request.environ)
262
263 def test_limited_stream(self):
264 # Read all of a limited stream
265 stream = LimitedStream(BytesIO(b"test"), 2)
266 self.assertEqual(stream.read(), b"te")
267 # Reading again returns nothing.
268 self.assertEqual(stream.read(), b"")
269
270 # Read a number of characters greater than the stream has to offer
271 stream = LimitedStream(BytesIO(b"test"), 2)
272 self.assertEqual(stream.read(5), b"te")
273 # Reading again returns nothing.
274 self.assertEqual(stream.readline(5), b"")
275
276 # Read sequentially from a stream
277 stream = LimitedStream(BytesIO(b"12345678"), 8)
278 self.assertEqual(stream.read(5), b"12345")
279 self.assertEqual(stream.read(5), b"678")
280 # Reading again returns nothing.
281 self.assertEqual(stream.readline(5), b"")
282
283 # Read lines from a stream
284 stream = LimitedStream(BytesIO(b"1234\n5678\nabcd\nefgh\nijkl"), 24)
285 # Read a full line, unconditionally
286 self.assertEqual(stream.readline(), b"1234\n")
287 # Read a number of characters less than a line
288 self.assertEqual(stream.readline(2), b"56")
289 # Read the rest of the partial line
290 self.assertEqual(stream.readline(), b"78\n")
291 # Read a full line, with a character limit greater than the line length
292 self.assertEqual(stream.readline(6), b"abcd\n")
293 # Read the next line, deliberately terminated at the line end
294 self.assertEqual(stream.readline(4), b"efgh")
295 # Read the next line... just the line end
296 self.assertEqual(stream.readline(), b"\n")
297 # Read everything else.
298 self.assertEqual(stream.readline(), b"ijkl")
299
300 # Regression for #15018
301 # If a stream contains a newline, but the provided length
302 # is less than the number of provided characters, the newline
303 # doesn't reset the available character count
304 stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9)
305 self.assertEqual(stream.readline(10), b"1234\n")
306 self.assertEqual(stream.readline(3), b"abc")
307 # Now expire the available characters
308 self.assertEqual(stream.readline(3), b"d")
309 # Reading again returns nothing.
310 self.assertEqual(stream.readline(2), b"")
311
312 # Same test, but with read, not readline.
313 stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9)
314 self.assertEqual(stream.read(6), b"1234\na")
315 self.assertEqual(stream.read(2), b"bc")
316 self.assertEqual(stream.read(2), b"d")
317 self.assertEqual(stream.read(2), b"")
318 self.assertEqual(stream.read(), b"")
319
320 def test_stream_read(self):

Callers

nothing calls this directly

Calls 3

readMethod · 0.95
readlineMethod · 0.95
LimitedStreamClass · 0.90

Tested by

no test coverage detected