MCPcopy
hub / github.com/python/mypy / test_multiple_messages

Method test_multiple_messages

mypy/test/testipc.py:76–99  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

74 assert p.exitcode == 0
75
76 def test_multiple_messages(self) -> None:
77 queue: Queue[str] = self.ctx.Queue()
78 p = self.ctx.Process(target=server_multi_message_echo, args=(queue,), daemon=True)
79 p.start()
80 connection_name = queue.get()
81 with IPCClient(connection_name, timeout=1) as client:
82 # "foo bar" with extra accents on letters.
83 # In UTF-8 encoding so we don't confuse editors opening this file.
84 fancy_text = b"f\xcc\xb6o\xcc\xb2\xf0\x9d\x91\x9c \xd0\xb2\xe2\xb7\xa1a\xcc\xb6r\xcc\x93\xcd\x98\xcd\x8c"
85 client.write(fancy_text.decode("utf-8"))
86 assert client.read() == fancy_text.decode("utf-8")
87
88 client.write("Test with spaces")
89 client.write("Test write before reading previous")
90 time.sleep(0) # yield to the server to force reading of all messages by server.
91 assert client.read() == "Test with spaces"
92 assert client.read() == "Test write before reading previous"
93
94 client.write("quit")
95 assert client.read() == "quit"
96 queue.close()
97 queue.join_thread()
98 p.join()
99 assert p.exitcode == 0
100
101 # Run test_connect_twice a lot, in the hopes of finding issues.
102 # This is really slow, so it is skipped, but can be enabled if

Callers

nothing calls this directly

Calls 7

IPCClientClass · 0.90
getMethod · 0.45
writeMethod · 0.45
decodeMethod · 0.45
readMethod · 0.45
closeMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected