MCPcopy
hub / github.com/psycopg/psycopg / test_multiprocess_close

Function test_multiprocess_close

tests/test_concurrency.py:69–107  ·  view source on GitHub ↗
(dsn, tmpdir)

Source from the content-addressed store, hash-verified

67@pytest.mark.slow
68@pytest.mark.subprocess
69def test_multiprocess_close(dsn, tmpdir):
70 # Check the problem reported in psycopg2#829
71 # Subprocess gcs the copy of the fd after fork so it closes connection.
72 module = f"""\
73import time
74import psycopg
75
76def thread():
77 conn = psycopg.connect({dsn!r})
78 curs = conn.cursor()
79 for i in range(10):
80 curs.execute("select 1")
81 time.sleep(0.1)
82
83def process():
84 time.sleep(0.2)
85"""
86
87 script = """\
88import time
89import threading
90import multiprocessing
91import mptest
92
93t = threading.Thread(target=mptest.thread, name='mythread')
94t.start()
95time.sleep(0.2)
96multiprocessing.Process(target=mptest.process, name='myprocess').start()
97t.join()
98"""
99
100 with (tmpdir / "mptest.py").open("w") as f:
101 f.write(module)
102 env = dict(os.environ)
103 env["PYTHONPATH"] = str(tmpdir + os.pathsep + env.get("PYTHONPATH", ""))
104 out = sp.check_output(
105 [sys.executable, "-c", script], stderr=sp.STDOUT, env=env
106 ).decode("utf8", "replace")
107 assert out == "", out.strip().splitlines()[-1]
108
109
110@pytest.mark.slow

Callers

nothing calls this directly

Calls 3

openMethod · 0.45
writeMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected