MCPcopy
hub / github.com/psycopg/psycopg / test_wait_r_no_linux

Function test_wait_r_no_linux

tests/test_waiting.py:115–164  ·  view source on GitHub ↗
(waitfn, ready, interval, request)

Source from the content-addressed store, hash-verified

113@pytest.mark.parametrize("ready", ["R", "NONE"])
114@pytest.mark.parametrize("waitfn", waitfns)
115def test_wait_r_no_linux(waitfn, ready, interval, request):
116 # A version of test_wait_r that works on macOS too, but doesn't allow to
117 # test for the RW wait (because it seems that the sockets returned by
118 # socketpair() is immediately w-ready, including the r one.
119 waitfn = getattr(waiting, waitfn)
120 wait = waiting.Wait.R
121 ready = getattr(waiting.Ready, ready)
122 delay = interval / 2
123
124 ev = Event()
125
126 def writer():
127 # Wake up the socket, or let it time out, according to the expected `ready`.
128 ev.wait()
129 sleep(delay)
130 if ready == waiting.Ready.R:
131 for att in range(10):
132 try:
133 ws.sendall(b"hi")
134 ws.close()
135 except Exception as ex:
136 the_ex = ex
137 sleep(0.1)
138 else:
139 break
140 else:
141 pytest.fail(
142 f"failed after many attempts. Socket: {ws}, Last error: {the_ex}"
143 )
144
145 tasks = [spawn(writer)]
146 try:
147 rs, ws = socket.socketpair()
148 rs.setblocking(False)
149 ws.setblocking(False)
150 # Let the writer start
151 ev.set()
152 # Wait for socket ready to read or timing out
153 t0 = time.time()
154 r = waitfn(tgen(wait), rs.fileno(), interval)
155 dt = time.time() - t0
156 # Check timing and received waiting state
157 assert r == ready
158 if check_timing(request):
159 exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready]
160 assert exptime <= dt < exptime * 1.2
161 finally:
162 gather(*tasks)
163 rs.close()
164 ws.close()
165
166
167@pytest.mark.parametrize("ready", ["R", "NONE"])

Callers

nothing calls this directly

Calls 7

filenoMethod · 0.80
spawnFunction · 0.70
tgenFunction · 0.70
check_timingFunction · 0.70
gatherFunction · 0.70
setMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected