MCPcopy
hub / github.com/django/django / ASGITest

Class ASGITest

tests/asgi/tests.py:51–805  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

49
50@override_settings(ROOT_URLCONF="asgi.urls")
51class ASGITest(SimpleTestCase):
52 async_request_factory = AsyncRequestFactory()
53
54 def setUp(self):
55 request_started.disconnect(close_old_connections)
56 self.addCleanup(request_started.connect, close_old_connections)
57
58 async def test_get_asgi_application(self):
59 """
60 get_asgi_application() returns a functioning ASGI callable.
61 """
62 application = get_asgi_application()
63 # Construct HTTP request.
64 scope = self.async_request_factory._base_scope(path="/")
65 communicator = ApplicationCommunicator(application, scope)
66 await communicator.send_input({"type": "http.request"})
67 # Read the response.
68 response_start = await communicator.receive_output()
69 self.assertEqual(response_start["type"], "http.response.start")
70 self.assertEqual(response_start["status"], 200)
71 self.assertEqual(
72 set(response_start["headers"]),
73 {
74 (b"Content-Length", b"12"),
75 (b"Content-Type", b"text/html; charset=utf-8"),
76 },
77 )
78 response_body = await communicator.receive_output()
79 self.assertEqual(response_body["type"], "http.response.body")
80 self.assertEqual(response_body["body"], b"Hello World!")
81 # Allow response.close() to finish.
82 await communicator.wait()
83
84 async def test_asgi_cookies(self):
85 application = get_asgi_application()
86 scope = self.async_request_factory._base_scope(path="/cookie/")
87 communicator = ApplicationCommunicator(application, scope)
88 await communicator.send_input({"type": "http.request"})
89 response_start = await communicator.receive_output()
90 self.assertIn((b"Set-Cookie", b"key=value; Path=/"), response_start["headers"])
91 # Allow response.close() to finish.
92 await communicator.wait()
93
94 # Python's file API is not async compatible. A third-party library such
95 # as https://github.com/Tinche/aiofiles allows passing the file to
96 # FileResponse as an async iterator. With a sync iterator
97 # StreamingHTTPResponse triggers a warning when iterating the file.
98 # assertWarnsMessage is not async compatible, so ignore_warnings for the
99 # test.
100 @ignore_warnings(module="django.core.handlers.asgi")
101 async def test_file_response(self):
102 """
103 Makes sure that FileResponse works over ASGI.
104 """
105 application = get_asgi_application()
106 # Construct HTTP request.
107 scope = self.async_request_factory._base_scope(path="/file/")
108 communicator = ApplicationCommunicator(application, scope)

Callers

nothing calls this directly

Calls 1

AsyncRequestFactoryClass · 0.90

Tested by

no test coverage detected