MCPcopy
hub / github.com/python/mypy / test_transaction_large

Method test_transaction_large

mypy/test/testipc.py:45–56  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

43 self.ctx = get_context("spawn")
44
45 def test_transaction_large(self) -> None:
46 queue: Queue[str] = self.ctx.Queue()
47 msg = "t" * 200000 # longer than the max read size of 100_000
48 p = self.ctx.Process(target=server, args=(msg, queue), daemon=True)
49 p.start()
50 connection_name = queue.get()
51 with IPCClient(connection_name, timeout=1) as client:
52 assert client.read() == msg
53 client.write("test")
54 queue.close()
55 queue.join_thread()
56 p.join()
57
58 def test_connect_twice(self) -> None:
59 queue: Queue[str] = self.ctx.Queue()

Callers

nothing calls this directly

Calls 6

IPCClientClass · 0.90
getMethod · 0.45
readMethod · 0.45
writeMethod · 0.45
closeMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected