| 28 | @pytest.mark.slow |
| 29 | @pytest.mark.parametrize("fmt_out", pq.Format) |
| 30 | async def test_changefeed(aconn_cls, dsn, aconn, testfeed, fmt_out): |
| 31 | await aconn.set_autocommit(True) |
| 32 | q = AQueue() |
| 33 | |
| 34 | async def worker(): |
| 35 | try: |
| 36 | async with await aconn_cls.connect(dsn, autocommit=True) as conn: |
| 37 | cur = conn.cursor(binary=fmt_out, row_factory=namedtuple_row) |
| 38 | try: |
| 39 | async for row in cur.stream( |
| 40 | f"experimental changefeed for {testfeed}" |
| 41 | ): |
| 42 | q.put_nowait(row) |
| 43 | except e.QueryCanceled: |
| 44 | assert conn.info.transaction_status == pq.TransactionStatus.IDLE |
| 45 | q.put_nowait(None) |
| 46 | except Exception as ex: |
| 47 | q.put_nowait(ex) |
| 48 | |
| 49 | t = spawn(worker) |
| 50 | |
| 51 | cur = aconn.cursor() |
| 52 | await cur.execute(f"insert into {testfeed} (data) values ('hello') returning id") |
| 53 | (key,) = await cur.fetchone() |
| 54 | row = await q.get() |
| 55 | assert row.table == testfeed |
| 56 | assert json.loads(row.key) == [key] |
| 57 | assert json.loads(row.value)["after"] == {"id": key, "data": "hello"} |
| 58 | |
| 59 | await cur.execute(f"delete from {testfeed} where id = %s", [key]) |
| 60 | row = await q.get() |
| 61 | assert row.table == testfeed |
| 62 | assert json.loads(row.key) == [key] |
| 63 | assert json.loads(row.value)["after"] is None |
| 64 | |
| 65 | await cur.execute("select query_id from [show statements] where query !~ 'show'") |
| 66 | (qid,) = await cur.fetchone() |
| 67 | await cur.execute("cancel query %s", [qid]) |
| 68 | assert cur.statusmessage == "CANCEL QUERIES 1" |
| 69 | |
| 70 | # We often find the record with {"after": null} at least another time |
| 71 | # in the queue. Let's tolerate an extra one. |
| 72 | for i in range(2): |
| 73 | if (row := (await q.get())) is None: |
| 74 | break |
| 75 | assert json.loads(row.value)["after"] is None, json |
| 76 | else: |
| 77 | pytest.fail("keep on receiving messages") |
| 78 | |
| 79 | await gather(t) |