MCPcopy
hub / github.com/psycopg/psycopg / test_row_factory

Function test_row_factory

tests/test_cursor_server_async.py:383–406  ·  view source on GitHub ↗
(aconn)

Source from the content-addressed store, hash-verified

381
382@pytest.mark.crdb_skip("scroll cursor")
383async def test_row_factory(aconn):
384 n = 0
385
386 def my_row_factory(cur):
387 nonlocal n
388 n += 1
389 return lambda values: [n] + [-v for v in values]
390
391 cur = aconn.cursor("foo", row_factory=my_row_factory, scrollable=True)
392 await cur.execute("select generate_series(1, 3) as x")
393 assert cur.rownumber == 0
394 recs = await cur.fetchall()
395 assert cur.rownumber == 3
396 await cur.scroll(0, "absolute")
397 assert cur.rownumber == 0
398 while rec := (await cur.fetchone()):
399 recs.append(rec)
400 assert cur.rownumber == 3
401 assert recs == [[1, -1], [1, -2], [1, -3]] * 2
402
403 await cur.scroll(0, "absolute")
404 cur.row_factory = rows.dict_row
405 assert await cur.fetchone() == {"x": 1}
406 await cur.close()
407
408
409async def test_rownumber(aconn):

Callers

nothing calls this directly

Calls 6

cursorMethod · 0.45
executeMethod · 0.45
fetchallMethod · 0.45
scrollMethod · 0.45
fetchoneMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…