MCPcopy
hub / github.com/psycopg/psycopg / test_generator_and_handler

Function test_generator_and_handler

tests/test_notify.py:228–267  ·  view source on GitHub ↗
(conn, conn_cls, dsn, recwarn)

Source from the content-addressed store, hash-verified

226@pytest.mark.slow
227@skip_free_threaded("warnings are context-local in the free-threaded build >= 3.14")
228def test_generator_and_handler(conn, conn_cls, dsn, recwarn):
229 # NOTE: we don't support generator+handlers anymore. So, if in the future
230 # this behaviour will change, we will not consider it a regression. However
231 # we will want to keep the warning check.
232
233 recwarn.clear()
234
235 conn.set_autocommit(True)
236 conn.execute("listen foo")
237
238 n1 = None
239 n2 = None
240
241 def set_n2(n):
242 nonlocal n2
243 n2 = n
244
245 conn.add_notify_handler(set_n2)
246
247 def listener():
248 nonlocal n1
249 for n1 in conn.notifies(timeout=1, stop_after=1):
250 pass
251
252 worker = spawn(listener)
253 try:
254 # Make sure the listener is listening
255 if not conn.lock.locked():
256 sleep(0.01)
257
258 with conn_cls.connect(dsn, autocommit=True) as nconn:
259 nconn.execute("notify foo, '1'")
260 finally:
261 gather(worker)
262
263 assert n1
264 assert n2
265
266 msg = str(recwarn.pop(RuntimeWarning).message)
267 assert "notifies()" in msg
268
269
270@pytest.mark.parametrize("query_between", [True, False])

Callers

nothing calls this directly

Calls 7

add_notify_handlerMethod · 0.80
spawnFunction · 0.70
gatherFunction · 0.70
clearMethod · 0.45
set_autocommitMethod · 0.45
executeMethod · 0.45
connectMethod · 0.45

Tested by

no test coverage detected