MCPcopy
hub / github.com/psycopg/psycopg / test_stop_after_batch

Function test_stop_after_batch

tests/test_notify.py:183–200  ·  view source on GitHub ↗
(conn_cls, conn, dsn)

Source from the content-addressed store, hash-verified

181
182@pytest.mark.timing
183def test_stop_after_batch(conn_cls, conn, dsn):
184 conn.set_autocommit(True)
185 conn.execute("listen foo")
186
187 def notifier():
188 with conn_cls.connect(dsn, autocommit=True) as nconn:
189 with nconn.transaction():
190 nconn.execute("notify foo, '1'")
191 nconn.execute("notify foo, '2'")
192
193 worker = spawn(notifier)
194 try:
195 ns = list(conn.notifies(timeout=1.0, stop_after=1))
196 assert len(ns) == 2
197 assert ns[0].payload == "1"
198 assert ns[1].payload == "2"
199 finally:
200 gather(worker)
201
202
203@pytest.mark.slow

Callers

nothing calls this directly

Calls 5

spawnFunction · 0.70
gatherFunction · 0.70
set_autocommitMethod · 0.45
executeMethod · 0.45
notifiesMethod · 0.45

Tested by

no test coverage detected