| 113 | @pytest.mark.parametrize("ready", ["R", "NONE"]) |
| 114 | @pytest.mark.parametrize("waitfn", waitfns) |
| 115 | def test_wait_r_no_linux(waitfn, ready, interval, request): |
| 116 | # A version of test_wait_r that works on macOS too, but doesn't allow to |
| 117 | # test for the RW wait (because it seems that the sockets returned by |
| 118 | # socketpair() is immediately w-ready, including the r one. |
| 119 | waitfn = getattr(waiting, waitfn) |
| 120 | wait = waiting.Wait.R |
| 121 | ready = getattr(waiting.Ready, ready) |
| 122 | delay = interval / 2 |
| 123 | |
| 124 | ev = Event() |
| 125 | |
| 126 | def writer(): |
| 127 | # Wake up the socket, or let it time out, according to the expected `ready`. |
| 128 | ev.wait() |
| 129 | sleep(delay) |
| 130 | if ready == waiting.Ready.R: |
| 131 | for att in range(10): |
| 132 | try: |
| 133 | ws.sendall(b"hi") |
| 134 | ws.close() |
| 135 | except Exception as ex: |
| 136 | the_ex = ex |
| 137 | sleep(0.1) |
| 138 | else: |
| 139 | break |
| 140 | else: |
| 141 | pytest.fail( |
| 142 | f"failed after many attempts. Socket: {ws}, Last error: {the_ex}" |
| 143 | ) |
| 144 | |
| 145 | tasks = [spawn(writer)] |
| 146 | try: |
| 147 | rs, ws = socket.socketpair() |
| 148 | rs.setblocking(False) |
| 149 | ws.setblocking(False) |
| 150 | # Let the writer start |
| 151 | ev.set() |
| 152 | # Wait for socket ready to read or timing out |
| 153 | t0 = time.time() |
| 154 | r = waitfn(tgen(wait), rs.fileno(), interval) |
| 155 | dt = time.time() - t0 |
| 156 | # Check timing and received waiting state |
| 157 | assert r == ready |
| 158 | if check_timing(request): |
| 159 | exptime = {waiting.Ready.R: delay, waiting.Ready.NONE: interval}[ready] |
| 160 | assert exptime <= dt < exptime * 1.2 |
| 161 | finally: |
| 162 | gather(*tasks) |
| 163 | rs.close() |
| 164 | ws.close() |
| 165 | |
| 166 | |
| 167 | @pytest.mark.parametrize("ready", ["R", "NONE"]) |