MCPcopy
hub / github.com/aio-libs/aiohttp / run

Function run

aiohttp/test_utils.py:105–144  ·  view source on GitHub ↗
(loop, fut)

Source from the content-addressed store, hash-verified

103 sslcontext = None
104
105 def run(loop, fut):
106 thread_loop = asyncio.new_event_loop()
107 asyncio.set_event_loop(thread_loop)
108
109 if isinstance(listen_addr, tuple):
110 host, port = listen_addr
111 server_coroutine = thread_loop.create_server(
112 lambda: TestHttpServer(keep_alive=0.5),
113 host, port, ssl=sslcontext)
114 else:
115 try:
116 os.unlink(listen_addr)
117 except FileNotFoundError:
118 pass
119 server_coroutine = thread_loop.create_unix_server(
120 lambda: TestHttpServer(keep_alive=0.5, timeout=15),
121 listen_addr, ssl=sslcontext)
122 server = thread_loop.run_until_complete(server_coroutine)
123
124 waiter = asyncio.Future(loop=thread_loop)
125 loop.call_soon_threadsafe(
126 fut.set_result, (thread_loop, waiter,
127 server.sockets[0].getsockname()))
128
129 try:
130 thread_loop.run_until_complete(waiter)
131 finally:
132 # call pending connection_made if present
133 run_briefly(thread_loop)
134
135 # close opened transports
136 for tr in transports:
137 tr.close()
138
139 run_briefly(thread_loop) # call close callbacks
140
141 server.close()
142 thread_loop.stop()
143 thread_loop.close()
144 gc.collect()
145
146 fut = asyncio.Future(loop=loop)
147 server_thread = threading.Thread(target=run, args=(loop, fut))

Callers

nothing calls this directly

Calls 6

TestHttpServerClass · 0.85
run_brieflyFunction · 0.85
create_unix_serverMethod · 0.80
create_serverMethod · 0.45
closeMethod · 0.45
stopMethod · 0.45

Tested by

no test coverage detected