MCPcopy
hub / github.com/psycopg/psycopg / test_steal_cursor

Function test_steal_cursor

tests/test_cursor_server_async.py:550–562  ·  view source on GitHub ↗
(aconn, row_factory)

Source from the content-addressed store, hash-verified

548
549@pytest.mark.parametrize("row_factory", ["tuple_row", "namedtuple_row"])
550async def test_steal_cursor(aconn, row_factory):
551 cur1 = aconn.cursor()
552 await cur1.execute("declare test cursor for select generate_series(1, 6) as s")
553
554 cur2 = aconn.cursor("test", row_factory=getattr(rows, row_factory))
555 # can call fetch without execute
556 rec = await cur2.fetchone()
557 assert rec == (1,)
558 if row_factory == "namedtuple_row":
559 assert rec.s == 1
560 assert await cur2.fetchmany(3) == [(2,), (3,), (4,)]
561 assert await cur2.fetchall() == [(5,), (6,)]
562 await cur2.close()
563
564
565async def test_stolen_cursor_close(aconn):

Callers

nothing calls this directly

Calls 6

cursorMethod · 0.45
executeMethod · 0.45
fetchoneMethod · 0.45
fetchmanyMethod · 0.45
fetchallMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected