MCPcopy Index your code
hub / github.com/python/cpython / test_pipe_overlapped

Method test_pipe_overlapped

Lib/test/test_asyncio/test_windows_utils.py:24–59  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

22class PipeTests(unittest.TestCase):
23
24 def test_pipe_overlapped(self):
25 h1, h2 = windows_utils.pipe(overlapped=(True, True))
26 try:
27 ov1 = _overlapped.Overlapped()
28 self.assertFalse(ov1.pending)
29 self.assertEqual(ov1.error, 0)
30
31 ov1.ReadFile(h1, 100)
32 self.assertTrue(ov1.pending)
33 self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING)
34 ERROR_IO_INCOMPLETE = 996
35 try:
36 ov1.getresult()
37 except OSError as e:
38 self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
39 else:
40 raise RuntimeError('expected ERROR_IO_INCOMPLETE')
41
42 ov2 = _overlapped.Overlapped()
43 self.assertFalse(ov2.pending)
44 self.assertEqual(ov2.error, 0)
45
46 ov2.WriteFile(h2, b"hello")
47 self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
48
49 res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
50 self.assertEqual(res, _winapi.WAIT_OBJECT_0)
51
52 self.assertFalse(ov1.pending)
53 self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
54 self.assertFalse(ov2.pending)
55 self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
56 self.assertEqual(ov1.getresult(), b"hello")
57 finally:
58 _winapi.CloseHandle(h1)
59 _winapi.CloseHandle(h2)
60
61 def test_pipe_handle(self):
62 h, _ = windows_utils.pipe(overlapped=(True, True))

Callers

nothing calls this directly

Calls 6

assertFalseMethod · 0.80
assertTrueMethod · 0.80
assertInMethod · 0.80
pipeMethod · 0.45
assertEqualMethod · 0.45
getresultMethod · 0.45

Tested by

no test coverage detected