(spair: TYPE_SOCKET_PAIR)
| 80 | |
| 81 | |
| 82 | def test_wait_for_read_write(spair: TYPE_SOCKET_PAIR) -> None: |
| 83 | a, b = spair |
| 84 | |
| 85 | assert not wait_for_read(a, 0) |
| 86 | assert wait_for_write(a, 0) |
| 87 | |
| 88 | b.send(b"x") |
| 89 | |
| 90 | assert wait_for_read(a, 0) |
| 91 | assert wait_for_write(a, 0) |
| 92 | |
| 93 | # Fill up the socket with data |
| 94 | a.setblocking(False) |
| 95 | try: |
| 96 | while True: |
| 97 | a.send(b"x" * 999999) |
| 98 | except OSError: |
| 99 | pass |
| 100 | |
| 101 | # Now it's not writable anymore |
| 102 | assert not wait_for_write(a, 0) |
| 103 | |
| 104 | |
| 105 | @pytest.mark.skipif(not hasattr(signal, "setitimer"), reason="need setitimer() support") |
nothing calls this directly
no test coverage detected
searching dependent graphs…