MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / cursor_wrapper

Method cursor_wrapper

test/sql/test_resultset.py:1822–1842  ·  view source on GitHub ↗
(self, engine)

Source from the content-addressed store, hash-verified

1820
1821 @contextmanager
1822 def cursor_wrapper(self, engine):
1823 calls = defaultdict(int)
1824
1825 class CursorWrapper:
1826 def __init__(self, real_cursor):
1827 self.real_cursor = real_cursor
1828
1829 def __getattr__(self, name):
1830 calls[name] += 1
1831 return getattr(self.real_cursor, name)
1832
1833 create_cursor = engine.dialect.execution_ctx_cls.create_cursor
1834
1835 def new_create(context):
1836 cursor = create_cursor(context)
1837 return CursorWrapper(cursor)
1838
1839 with patch.object(
1840 engine.dialect.execution_ctx_cls, "create_cursor", new_create
1841 ):
1842 yield calls
1843
1844 def test_no_rowcount_on_selects_inserts(self, metadata, testing_engine):
1845 """assert that rowcount is only called on deletes and updates.

Calls 1

objectMethod · 0.80

Tested by

no test coverage detected