MCPcopy
hub / github.com/psycopg/psycopg / test_wait_r_nowait

Function test_wait_r_nowait

tests/test_waiting_async.py:178–224  ·  view source on GitHub ↗
(waitfn, event, ready, request)

Source from the content-addressed store, hash-verified

176@pytest.mark.parametrize("event", ["R", "RW"])
177@pytest.mark.parametrize("waitfn", waitfns)
178async def test_wait_r_nowait(waitfn, event, ready, request):
179 # Test that wait functions handle a poll when called with no timeout
180 waitfn = getattr(waiting, waitfn)
181 wait = getattr(waiting.Wait, event)
182 ready = getattr(waiting.Ready, ready)
183
184 port = None
185 ev1 = AEvent()
186 ev2 = AEvent()
187 ev3 = AEvent()
188
189 async def writer():
190 await ev1.wait()
191 assert port
192 if ready == waiting.Ready.R:
193 with socket.create_connection(("127.0.0.1", port)):
194 ev2.set()
195 else:
196 ev2.set()
197
198 async def unblocker():
199 # If test doesn't pass, wake up the socket again to avoid hanging forever
200 if not await ev3.wait_timeout(0.5):
201 assert port
202 with socket.create_connection(("127.0.0.1", port)):
203 pass
204
205 t1 = spawn(writer)
206 t2 = spawn(unblocker)
207 try:
208 with socket.socket() as s:
209 s.bind(("127.0.0.1", 0))
210 port = s.getsockname()[1]
211 s.listen(10)
212 s.setblocking(False)
213 ev1.set()
214 await ev2.wait()
215 t0 = time.time()
216 r = await waitfn(tgen(wait), s.fileno())
217 dt = time.time() - t0
218 ev3.set() # unblock the unblocker
219 if check_timing(request):
220 assert dt < 0.1
221 assert r == ready
222 finally:
223 # await gather(t1)
224 await gather(t1, t2)
225
226
227@pytest.mark.slow

Callers

nothing calls this directly

Calls 9

filenoMethod · 0.80
AEventClass · 0.70
spawnFunction · 0.70
tgenFunction · 0.70
check_timingFunction · 0.70
gatherFunction · 0.70
socketMethod · 0.45
setMethod · 0.45
waitMethod · 0.45

Tested by

no test coverage detected