MCPcopy
hub / github.com/psycopg/psycopg / test_notify_query_notify

Function test_notify_query_notify

tests/test_notify.py:291–345  ·  view source on GitHub ↗
(conn_cls, dsn, sleep_on, listen_by)

Source from the content-addressed store, hash-verified

289@pytest.mark.parametrize("sleep_on", ["server", "client"])
290@pytest.mark.parametrize("listen_by", ["callback", "generator"])
291def test_notify_query_notify(conn_cls, dsn, sleep_on, listen_by):
292 e = Event()
293 notifies: list[int] = []
294 workers = []
295
296 def notifier():
297 with conn_cls.connect(dsn, autocommit=True) as conn:
298 sleep(0.1)
299 for i in range(3):
300 conn.execute("select pg_notify('counter', %s)", (str(i),))
301 sleep(0.2)
302
303 def nap(conn):
304 if sleep_on == "server":
305 conn.execute("select pg_sleep(0.2)")
306 else:
307 assert sleep_on == "client"
308 sleep(0.2)
309
310 if listen_by == "callback":
311
312 def listener():
313 with conn_cls.connect(dsn, autocommit=True) as conn:
314 conn.add_notify_handler(lambda n: notifies.append(int(n.payload)))
315
316 conn.execute("listen counter")
317 e.set()
318
319 nap(conn)
320 conn.execute("")
321 nap(conn)
322 conn.execute("")
323 nap(conn)
324 conn.execute("")
325
326 else:
327
328 def listener():
329 with conn_cls.connect(dsn, autocommit=True) as conn:
330 conn.execute("listen counter")
331 e.set()
332 for n in conn.notifies(timeout=0.2):
333 notifies.append(int(n.payload))
334
335 nap(conn)
336
337 for n in conn.notifies(timeout=0.2):
338 notifies.append(int(n.payload))
339
340 workers.append(spawn(listener))
341 e.wait()
342 workers.append(spawn(notifier))
343 gather(*workers)
344
345 assert notifies == list(range(3))

Callers

nothing calls this directly

Calls 3

spawnFunction · 0.70
gatherFunction · 0.70
waitMethod · 0.45

Tested by

no test coverage detected