(self)
| 22 | class PipeTests(unittest.TestCase): |
| 23 | |
| 24 | def test_pipe_overlapped(self): |
| 25 | h1, h2 = windows_utils.pipe(overlapped=(True, True)) |
| 26 | try: |
| 27 | ov1 = _overlapped.Overlapped() |
| 28 | self.assertFalse(ov1.pending) |
| 29 | self.assertEqual(ov1.error, 0) |
| 30 | |
| 31 | ov1.ReadFile(h1, 100) |
| 32 | self.assertTrue(ov1.pending) |
| 33 | self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING) |
| 34 | ERROR_IO_INCOMPLETE = 996 |
| 35 | try: |
| 36 | ov1.getresult() |
| 37 | except OSError as e: |
| 38 | self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE) |
| 39 | else: |
| 40 | raise RuntimeError('expected ERROR_IO_INCOMPLETE') |
| 41 | |
| 42 | ov2 = _overlapped.Overlapped() |
| 43 | self.assertFalse(ov2.pending) |
| 44 | self.assertEqual(ov2.error, 0) |
| 45 | |
| 46 | ov2.WriteFile(h2, b"hello") |
| 47 | self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) |
| 48 | |
| 49 | res = _winapi.WaitForMultipleObjects([ov2.event], False, 100) |
| 50 | self.assertEqual(res, _winapi.WAIT_OBJECT_0) |
| 51 | |
| 52 | self.assertFalse(ov1.pending) |
| 53 | self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE) |
| 54 | self.assertFalse(ov2.pending) |
| 55 | self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING}) |
| 56 | self.assertEqual(ov1.getresult(), b"hello") |
| 57 | finally: |
| 58 | _winapi.CloseHandle(h1) |
| 59 | _winapi.CloseHandle(h2) |
| 60 | |
| 61 | def test_pipe_handle(self): |
| 62 | h, _ = windows_utils.pipe(overlapped=(True, True)) |
nothing calls this directly
no test coverage detected