MCPcopy
hub / github.com/psycopg/psycopg / test_wait_w

Function test_wait_w

tests/test_waiting_async.py:230–245  ·  view source on GitHub ↗
(waitfn, event, request)

Source from the content-addressed store, hash-verified

228@pytest.mark.parametrize("event", ["W", "RW"])
229@pytest.mark.parametrize("waitfn", waitfns)
230async def test_wait_w(waitfn, event, request):
231 # Test that wait functions handle waiting and returning state correctly
232 waitfn = getattr(waiting, waitfn)
233 wait = getattr(waiting.Wait, event)
234
235 rs, ws = socket.socketpair() # the w socket is already ready for writing
236 rs.setblocking(False)
237 ws.setblocking(False)
238 with rs, ws:
239 t0 = time.time()
240 r = await waitfn(tgen(wait), ws.fileno(), 0.5)
241 dt = time.time() - t0
242 # Check timing and received waiting state
243 assert r == waiting.Ready.W
244 if check_timing(request):
245 assert dt < 0.1
246
247
248@pytest.mark.parametrize("waitfn", waitfns)

Callers

nothing calls this directly

Calls 3

filenoMethod · 0.80
tgenFunction · 0.70
check_timingFunction · 0.70

Tested by

no test coverage detected