MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / _query_fixture

Method _query_fixture

test/ext/test_baked.py:1002–1041  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1000 return User, Address
1001
1002 def _query_fixture(self):
1003 from sqlalchemy.orm.query import Query
1004
1005 class CachingQuery(Query):
1006 cache = {}
1007
1008 def set_cache_key(self, key):
1009 return self.execution_options(_cache_key=key)
1010
1011 def set_cache_key_for_path(self, path, key):
1012 return self.execution_options(**{"_cache_key_%s" % path: key})
1013
1014 def get_value(cache_key, cache, createfunc):
1015 if cache_key in cache:
1016 return cache[cache_key]()
1017 else:
1018 cache[cache_key] = retval = createfunc().freeze()
1019 return retval()
1020
1021 s1 = fixture_session(query_cls=CachingQuery)
1022
1023 @event.listens_for(s1, "do_orm_execute", retval=True)
1024 def do_orm_execute(orm_context):
1025 ckey = None
1026 for opt in orm_context.user_defined_options:
1027 ckey = opt.get_cache_key(orm_context)
1028 if ckey:
1029 break
1030 else:
1031 if "_cache_key" in orm_context.execution_options:
1032 ckey = orm_context.execution_options["_cache_key"]
1033
1034 if ckey is not None:
1035 return get_value(
1036 ckey,
1037 CachingQuery.cache,
1038 orm_context.invoke_statement,
1039 )
1040
1041 return s1
1042
1043 def _option_fixture(self):
1044 from sqlalchemy.orm.interfaces import UserDefinedOption

Callers 4

test_non_bakedMethod · 0.95
test_non_baked_tuplesMethod · 0.95
test_use_w_bakedMethod · 0.95

Calls 1

fixture_sessionFunction · 0.90

Tested by

no test coverage detected