MCPcopy
hub / github.com/psycopg/psycopg / test_stop_after

Function test_stop_after

tests/test_notify_async.py:153–176  ·  view source on GitHub ↗
(aconn_cls, aconn, dsn)

Source from the content-addressed store, hash-verified

151@pytest.mark.slow
152@pytest.mark.timing
153async def test_stop_after(aconn_cls, aconn, dsn):
154 await aconn.set_autocommit(True)
155 await aconn.execute("listen foo")
156
157 async def notifier():
158 async with await aconn_cls.connect(dsn, autocommit=True) as nconn:
159 await nconn.execute("notify foo, '1'")
160 await asleep(0.1)
161 await nconn.execute("notify foo, '2'")
162 await asleep(0.1)
163 await nconn.execute("notify foo, '3'")
164
165 worker = spawn(notifier)
166 try:
167 ns = await alist(aconn.notifies(timeout=1.0, stop_after=2))
168 assert len(ns) == 2
169 assert ns[0].payload == "1"
170 assert ns[1].payload == "2"
171 finally:
172 await gather(worker)
173
174 ns = await alist(aconn.notifies(timeout=0.0))
175 assert len(ns) == 1
176 assert ns[0].payload == "3"
177
178
179@pytest.mark.timing

Callers

nothing calls this directly

Calls 6

alistFunction · 0.85
spawnFunction · 0.70
gatherFunction · 0.70
set_autocommitMethod · 0.45
executeMethod · 0.45
notifiesMethod · 0.45

Tested by

no test coverage detected