(self)
| 261 | self.assertIs(request_copy.environ, request.environ) |
| 262 | |
| 263 | def test_limited_stream(self): |
| 264 | # Read all of a limited stream |
| 265 | stream = LimitedStream(BytesIO(b"test"), 2) |
| 266 | self.assertEqual(stream.read(), b"te") |
| 267 | # Reading again returns nothing. |
| 268 | self.assertEqual(stream.read(), b"") |
| 269 | |
| 270 | # Read a number of characters greater than the stream has to offer |
| 271 | stream = LimitedStream(BytesIO(b"test"), 2) |
| 272 | self.assertEqual(stream.read(5), b"te") |
| 273 | # Reading again returns nothing. |
| 274 | self.assertEqual(stream.readline(5), b"") |
| 275 | |
| 276 | # Read sequentially from a stream |
| 277 | stream = LimitedStream(BytesIO(b"12345678"), 8) |
| 278 | self.assertEqual(stream.read(5), b"12345") |
| 279 | self.assertEqual(stream.read(5), b"678") |
| 280 | # Reading again returns nothing. |
| 281 | self.assertEqual(stream.readline(5), b"") |
| 282 | |
| 283 | # Read lines from a stream |
| 284 | stream = LimitedStream(BytesIO(b"1234\n5678\nabcd\nefgh\nijkl"), 24) |
| 285 | # Read a full line, unconditionally |
| 286 | self.assertEqual(stream.readline(), b"1234\n") |
| 287 | # Read a number of characters less than a line |
| 288 | self.assertEqual(stream.readline(2), b"56") |
| 289 | # Read the rest of the partial line |
| 290 | self.assertEqual(stream.readline(), b"78\n") |
| 291 | # Read a full line, with a character limit greater than the line length |
| 292 | self.assertEqual(stream.readline(6), b"abcd\n") |
| 293 | # Read the next line, deliberately terminated at the line end |
| 294 | self.assertEqual(stream.readline(4), b"efgh") |
| 295 | # Read the next line... just the line end |
| 296 | self.assertEqual(stream.readline(), b"\n") |
| 297 | # Read everything else. |
| 298 | self.assertEqual(stream.readline(), b"ijkl") |
| 299 | |
| 300 | # Regression for #15018 |
| 301 | # If a stream contains a newline, but the provided length |
| 302 | # is less than the number of provided characters, the newline |
| 303 | # doesn't reset the available character count |
| 304 | stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9) |
| 305 | self.assertEqual(stream.readline(10), b"1234\n") |
| 306 | self.assertEqual(stream.readline(3), b"abc") |
| 307 | # Now expire the available characters |
| 308 | self.assertEqual(stream.readline(3), b"d") |
| 309 | # Reading again returns nothing. |
| 310 | self.assertEqual(stream.readline(2), b"") |
| 311 | |
| 312 | # Same test, but with read, not readline. |
| 313 | stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9) |
| 314 | self.assertEqual(stream.read(6), b"1234\na") |
| 315 | self.assertEqual(stream.read(2), b"bc") |
| 316 | self.assertEqual(stream.read(2), b"d") |
| 317 | self.assertEqual(stream.read(2), b"") |
| 318 | self.assertEqual(stream.read(), b"") |
| 319 | |
| 320 | def test_stream_read(self): |
nothing calls this directly
no test coverage detected