MCPcopy
hub / github.com/psycopg/psycopg / test_wait_r_nowait

Function test_wait_r_nowait

tests/test_waiting.py:170–216  ·  view source on GitHub ↗
(waitfn, event, ready, request)

Source from the content-addressed store, hash-verified

168@pytest.mark.parametrize("event", ["R", "RW"])
169@pytest.mark.parametrize("waitfn", waitfns)
170def test_wait_r_nowait(waitfn, event, ready, request):
171 # Test that wait functions handle a poll when called with no timeout
172 waitfn = getattr(waiting, waitfn)
173 wait = getattr(waiting.Wait, event)
174 ready = getattr(waiting.Ready, ready)
175
176 port = None
177 ev1 = Event()
178 ev2 = Event()
179 ev3 = Event()
180
181 def writer():
182 ev1.wait()
183 assert port
184 if ready == waiting.Ready.R:
185 with socket.create_connection(("127.0.0.1", port)):
186 ev2.set()
187 else:
188 ev2.set()
189
190 def unblocker():
191 # If test doesn't pass, wake up the socket again to avoid hanging forever
192 if not ev3.wait(0.5):
193 assert port
194 with socket.create_connection(("127.0.0.1", port)):
195 pass
196
197 t1 = spawn(writer)
198 t2 = spawn(unblocker)
199 try:
200 with socket.socket() as s:
201 s.bind(("127.0.0.1", 0))
202 port = s.getsockname()[1]
203 s.listen(10)
204 s.setblocking(False)
205 ev1.set()
206 ev2.wait()
207 t0 = time.time()
208 r = waitfn(tgen(wait), s.fileno())
209 dt = time.time() - t0
210 ev3.set() # unblock the unblocker
211 if check_timing(request):
212 assert dt < 0.1
213 assert r == ready
214 finally:
215 # await gather(t1)
216 gather(t1, t2)
217
218
219@pytest.mark.slow

Callers

nothing calls this directly

Calls 8

filenoMethod · 0.80
spawnFunction · 0.70
tgenFunction · 0.70
check_timingFunction · 0.70
gatherFunction · 0.70
socketMethod · 0.45
setMethod · 0.45
waitMethod · 0.45

Tested by

no test coverage detected