MCPcopy
hub / github.com/psycopg/psycopg / test_ctrl_c

Function test_ctrl_c

tests/test_concurrency.py:303–359  ·  view source on GitHub ↗
(conn, dsn)

Source from the content-addressed store, hash-verified

301)
302@pytest.mark.crdb("skip")
303def test_ctrl_c(conn, dsn):
304 conn.autocommit = True
305
306 APPNAME = "test_ctrl_c"
307 script = f"""\
308import psycopg
309
310with psycopg.connect({dsn!r}, application_name={APPNAME!r}) as conn:
311 conn.execute("select pg_sleep(60)")
312"""
313
314 if sys.platform == "win32":
315 creationflags = sp.CREATE_NEW_PROCESS_GROUP
316 sig = signal.CTRL_C_EVENT
317 else:
318 creationflags = 0
319 sig = signal.SIGINT
320
321 proc = None
322
323 def run_process():
324 nonlocal proc
325 proc = sp.Popen(
326 [sys.executable, "-s", "-c", script], creationflags=creationflags
327 )
328 proc.communicate()
329
330 t = threading.Thread(target=run_process)
331 t.start()
332
333 for i in range(20):
334 cur = conn.execute(
335 "select pid from pg_stat_activity where application_name = %s", (APPNAME,)
336 )
337
338 if rec := cur.fetchone():
339 pid = rec[0]
340 break
341 time.sleep(0.1)
342 else:
343 assert False, "process didn't start?"
344
345 t0 = time.time()
346 assert proc
347 proc.send_signal(sig)
348 proc.wait()
349
350 for i in range(20):
351 cur = conn.execute("select 1 from pg_stat_activity where pid = %s", (pid,))
352 if not cur.fetchone():
353 break
354 time.sleep(0.1)
355 else:
356 assert False, "process didn't stop?"
357
358 t1 = time.time()
359 assert t1 - t0 < 1.0
360

Callers

nothing calls this directly

Calls 4

startMethod · 0.45
executeMethod · 0.45
fetchoneMethod · 0.45
waitMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…