MCPcopy
hub / github.com/psycopg/psycopg / test_changefeed

Function test_changefeed

tests/crdb/test_cursor_async.py:30–79  ·  view source on GitHub ↗
(aconn_cls, dsn, aconn, testfeed, fmt_out)

Source from the content-addressed store, hash-verified

28@pytest.mark.slow
29@pytest.mark.parametrize("fmt_out", pq.Format)
30async def test_changefeed(aconn_cls, dsn, aconn, testfeed, fmt_out):
31 await aconn.set_autocommit(True)
32 q = AQueue()
33
34 async def worker():
35 try:
36 async with await aconn_cls.connect(dsn, autocommit=True) as conn:
37 cur = conn.cursor(binary=fmt_out, row_factory=namedtuple_row)
38 try:
39 async for row in cur.stream(
40 f"experimental changefeed for {testfeed}"
41 ):
42 q.put_nowait(row)
43 except e.QueryCanceled:
44 assert conn.info.transaction_status == pq.TransactionStatus.IDLE
45 q.put_nowait(None)
46 except Exception as ex:
47 q.put_nowait(ex)
48
49 t = spawn(worker)
50
51 cur = aconn.cursor()
52 await cur.execute(f"insert into {testfeed} (data) values ('hello') returning id")
53 (key,) = await cur.fetchone()
54 row = await q.get()
55 assert row.table == testfeed
56 assert json.loads(row.key) == [key]
57 assert json.loads(row.value)["after"] == {"id": key, "data": "hello"}
58
59 await cur.execute(f"delete from {testfeed} where id = %s", [key])
60 row = await q.get()
61 assert row.table == testfeed
62 assert json.loads(row.key) == [key]
63 assert json.loads(row.value)["after"] is None
64
65 await cur.execute("select query_id from [show statements] where query !~ 'show'")
66 (qid,) = await cur.fetchone()
67 await cur.execute("cancel query %s", [qid])
68 assert cur.statusmessage == "CANCEL QUERIES 1"
69
70 # We often find the record with {"after": null} at least another time
71 # in the queue. Let's tolerate an extra one.
72 for i in range(2):
73 if (row := (await q.get())) is None:
74 break
75 assert json.loads(row.value)["after"] is None, json
76 else:
77 pytest.fail("keep on receiving messages")
78
79 await gather(t)

Callers

nothing calls this directly

Calls 9

AQueueClass · 0.50
spawnFunction · 0.50
gatherFunction · 0.50
set_autocommitMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
fetchoneMethod · 0.45
getMethod · 0.45
failMethod · 0.45

Tested by

no test coverage detected