MCPcopy
hub / github.com/psycopg/psycopg / test_notify

Function test_notify

tests/test_notify_async.py:56–95  ·  view source on GitHub ↗
(aconn_cls, aconn, dsn)

Source from the content-addressed store, hash-verified

54@pytest.mark.slow
55@pytest.mark.timing
56async def test_notify(aconn_cls, aconn, dsn):
57 npid = None
58
59 async def notifier():
60 async with await aconn_cls.connect(dsn, autocommit=True) as nconn:
61 nonlocal npid
62 npid = nconn.pgconn.backend_pid
63
64 await asleep(0.25)
65 await nconn.execute("notify foo, '1'")
66 await asleep(0.25)
67 await nconn.execute("notify foo, '2'")
68
69 async def receiver():
70 await aconn.set_autocommit(True)
71 cur = aconn.cursor()
72 await cur.execute("listen foo")
73 gen = aconn.notifies()
74 async for n in gen:
75 ns.append((n, time()))
76 if len(ns) >= 2:
77 await gen.aclose()
78
79 ns: list[tuple[Notify, float]] = []
80 t0 = time()
81 workers = [spawn(notifier), spawn(receiver)]
82 await gather(*workers)
83 assert len(ns) == 2
84
85 n, t1 = ns[0]
86 assert n.pid == npid
87 assert n.channel == "foo"
88 assert n.payload == "1"
89 assert t1 - t0 == pytest.approx(0.25, abs=0.05)
90
91 n, t1 = ns[1]
92 assert n.pid == npid
93 assert n.channel == "foo"
94 assert n.payload == "2"
95 assert t1 - t0 == pytest.approx(0.5, abs=0.05)
96
97
98@pytest.mark.slow

Callers

nothing calls this directly

Calls 2

spawnFunction · 0.70
gatherFunction · 0.70

Tested by

no test coverage detected