(buffered, iterable)
| 661 | @pytest.mark.parametrize("buffered", (True, False)) |
| 662 | @pytest.mark.parametrize("iterable", (True, False)) |
| 663 | def test_run_wsgi_apps(buffered, iterable): |
| 664 | leaked_data = [] |
| 665 | |
| 666 | def simple_app(environ, start_response): |
| 667 | start_response("200 OK", [("Content-Type", "text/html")]) |
| 668 | return ["Hello World!"] |
| 669 | |
| 670 | def yielding_app(environ, start_response): |
| 671 | start_response("200 OK", [("Content-Type", "text/html")]) |
| 672 | yield "Hello " |
| 673 | yield "World!" |
| 674 | |
| 675 | def late_start_response(environ, start_response): |
| 676 | yield "Hello " |
| 677 | yield "World" |
| 678 | start_response("200 OK", [("Content-Type", "text/html")]) |
| 679 | yield "!" |
| 680 | |
| 681 | def depends_on_close(environ, start_response): |
| 682 | leaked_data.append("harhar") |
| 683 | start_response("200 OK", [("Content-Type", "text/html")]) |
| 684 | |
| 685 | class Rv: |
| 686 | def __iter__(self): |
| 687 | yield "Hello " |
| 688 | yield "World" |
| 689 | yield "!" |
| 690 | |
| 691 | def close(self): |
| 692 | assert leaked_data.pop() == "harhar" |
| 693 | |
| 694 | return Rv() |
| 695 | |
| 696 | for app in (simple_app, yielding_app, late_start_response, depends_on_close): |
| 697 | if iterable: |
| 698 | app = iterable_middleware(app) |
| 699 | app_iter, status, headers = run_wsgi_app(app, {}, buffered=buffered) |
| 700 | assert status == "200 OK" |
| 701 | assert list(headers) == [("Content-Type", "text/html")] |
| 702 | assert "".join(app_iter) == "Hello World!" |
| 703 | |
| 704 | if hasattr(app_iter, "close"): |
| 705 | app_iter.close() |
| 706 | assert not leaked_data |
| 707 | |
| 708 | |
| 709 | @pytest.mark.parametrize("buffered", (True, False)) |
nothing calls this directly
no test coverage detected