| 43 | self.ctx = get_context("spawn") |
| 44 | |
| 45 | def test_transaction_large(self) -> None: |
| 46 | queue: Queue[str] = self.ctx.Queue() |
| 47 | msg = "t" * 200000 # longer than the max read size of 100_000 |
| 48 | p = self.ctx.Process(target=server, args=(msg, queue), daemon=True) |
| 49 | p.start() |
| 50 | connection_name = queue.get() |
| 51 | with IPCClient(connection_name, timeout=1) as client: |
| 52 | assert client.read() == msg |
| 53 | client.write("test") |
| 54 | queue.close() |
| 55 | queue.join_thread() |
| 56 | p.join() |
| 57 | |
| 58 | def test_connect_twice(self) -> None: |
| 59 | queue: Queue[str] = self.ctx.Queue() |