| 63 | @pytest.mark.parametrize("event", ["R", "RW"]) |
| 64 | @pytest.mark.parametrize("waitfn", waitfns) |
| 65 | def test_wait_r(waitfn, event, ready, interval, request): |
| 66 | # Test that wait functions handle waiting and returning state correctly |
| 67 | # This test doesn't work on macOS for some internal race condition betwwn |
| 68 | # listen/connect/accept. |
| 69 | waitfn = getattr(waiting, waitfn) |
| 70 | wait = getattr(waiting.Wait, event) |
| 71 | ready = getattr(waiting.Ready, ready) |
| 72 | delay = interval / 2 |
| 73 | |
| 74 | port = None |
| 75 | ev = Event() |
| 76 | |
| 77 | def writer(): |
| 78 | # Wake up the socket, or let it time out, according to the expected `ready`. |
| 79 | ev.wait() |
| 80 | assert port |
| 81 | sleep(delay) |
| 82 | if ready == waiting.Ready.R: |
| 83 | with socket.create_connection(("127.0.0.1", port)): |
| 84 | pass |
| 85 | |
| 86 | tasks = [spawn(writer)] |
| 87 | |
| 88 | try: |
| 89 | with socket.socket() as s: |
| 90 | # Make a listening socket |
| 91 | s.bind(("127.0.0.1", 0)) |
| 92 | port = s.getsockname()[1] |
| 93 | s.listen(10) |
| 94 | s.setblocking(False) |
| 95 | # Let the writer start |
| 96 | ev.set() |
| 97 | # Wait for socket ready to read or timing out |
| 98 | t0 = time.time() |
| 99 | r = waitfn(tgen(wait), s.fileno(), interval) |
| 100 | dt = time.time() - t0 |
| 101 | # Check timing and received waiting state |
| 102 | assert r == ready |
| 103 | if check_timing(request): |
| 104 | exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] |
| 105 | assert exptime <= dt < exptime * 1.2 |
| 106 | finally: |
| 107 | gather(*tasks) |
| 108 | |
| 109 | |
| 110 | @pytest.mark.slow |