MCPcopy
hub / github.com/psycopg/psycopg / test_notify_handlers

Function test_notify_handlers

tests/test_notify.py:18–54  ·  view source on GitHub ↗
(conn)

Source from the content-addressed store, hash-verified

16
17
18def test_notify_handlers(conn):
19 nots1 = []
20 nots2 = []
21
22 def cb1(n):
23 nots1.append(n)
24
25 conn.add_notify_handler(cb1)
26 conn.add_notify_handler(lambda n: nots2.append(n))
27
28 conn.set_autocommit(True)
29 conn.execute("listen foo")
30 conn.execute("notify foo, 'n1'")
31
32 assert len(nots1) == 1
33 n = nots1[0]
34 assert n.channel == "foo"
35 assert n.payload == "n1"
36 assert n.pid == conn.pgconn.backend_pid
37
38 assert len(nots2) == 1
39 assert nots2[0] == nots1[0]
40
41 conn.remove_notify_handler(cb1)
42 conn.execute("notify foo, 'n2'")
43
44 assert len(nots1) == 1
45 assert len(nots2) == 2
46 n = nots2[1]
47 assert isinstance(n, Notify)
48 assert n.channel == "foo"
49 assert n.payload == "n2"
50 assert n.pid == conn.pgconn.backend_pid
51 assert hash(n)
52
53 with pytest.raises(ValueError):
54 conn.remove_notify_handler(cb1)
55
56
57@pytest.mark.slow

Callers

nothing calls this directly

Calls 4

add_notify_handlerMethod · 0.80
remove_notify_handlerMethod · 0.80
set_autocommitMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected