MCPcopy
hub / github.com/psycopg/psycopg / test_stop_after_batch

Function test_stop_after_batch

tests/test_notify_async.py:180–197  ·  view source on GitHub ↗
(aconn_cls, aconn, dsn)

Source from the content-addressed store, hash-verified

178
179@pytest.mark.timing
180async def test_stop_after_batch(aconn_cls, aconn, dsn):
181 await aconn.set_autocommit(True)
182 await aconn.execute("listen foo")
183
184 async def notifier():
185 async with await aconn_cls.connect(dsn, autocommit=True) as nconn:
186 async with nconn.transaction():
187 await nconn.execute("notify foo, '1'")
188 await nconn.execute("notify foo, '2'")
189
190 worker = spawn(notifier)
191 try:
192 ns = await alist(aconn.notifies(timeout=1.0, stop_after=1))
193 assert len(ns) == 2
194 assert ns[0].payload == "1"
195 assert ns[1].payload == "2"
196 finally:
197 await gather(worker)
198
199
200@pytest.mark.slow

Callers

nothing calls this directly

Calls 6

alistFunction · 0.85
spawnFunction · 0.70
gatherFunction · 0.70
set_autocommitMethod · 0.45
executeMethod · 0.45
notifiesMethod · 0.45

Tested by

no test coverage detected