MCPcopy
hub / github.com/psycopg/psycopg / test_wait_r

Function test_wait_r

tests/test_waiting.py:65–107  ·  view source on GitHub ↗
(waitfn, event, ready, interval, request)

Source from the content-addressed store, hash-verified

63@pytest.mark.parametrize("event", ["R", "RW"])
64@pytest.mark.parametrize("waitfn", waitfns)
65def test_wait_r(waitfn, event, ready, interval, request):
66 # Test that wait functions handle waiting and returning state correctly
67 # This test doesn't work on macOS for some internal race condition betwwn
68 # listen/connect/accept.
69 waitfn = getattr(waiting, waitfn)
70 wait = getattr(waiting.Wait, event)
71 ready = getattr(waiting.Ready, ready)
72 delay = interval / 2
73
74 port = None
75 ev = Event()
76
77 def writer():
78 # Wake up the socket, or let it time out, according to the expected `ready`.
79 ev.wait()
80 assert port
81 sleep(delay)
82 if ready == waiting.Ready.R:
83 with socket.create_connection(("127.0.0.1", port)):
84 pass
85
86 tasks = [spawn(writer)]
87
88 try:
89 with socket.socket() as s:
90 # Make a listening socket
91 s.bind(("127.0.0.1", 0))
92 port = s.getsockname()[1]
93 s.listen(10)
94 s.setblocking(False)
95 # Let the writer start
96 ev.set()
97 # Wait for socket ready to read or timing out
98 t0 = time.time()
99 r = waitfn(tgen(wait), s.fileno(), interval)
100 dt = time.time() - t0
101 # Check timing and received waiting state
102 assert r == ready
103 if check_timing(request):
104 exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
105 assert exptime <= dt < exptime * 1.2
106 finally:
107 gather(*tasks)
108
109
110@pytest.mark.slow

Callers

nothing calls this directly

Calls 7

filenoMethod · 0.80
spawnFunction · 0.70
tgenFunction · 0.70
check_timingFunction · 0.70
gatherFunction · 0.70
socketMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected