MCPcopy Index your code
hub / github.com/sqlalchemy/sqlalchemy / _caching_session_fixture

Method _caching_session_fixture

test/orm/test_events.py:84–114  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

82 cls._setup_stock_mapping()
83
84 def _caching_session_fixture(self):
85 cache = {}
86
87 maker = sessionmaker(testing.db, future=True)
88
89 def get_value(cache_key, cache, createfunc):
90 if cache_key in cache:
91 return cache[cache_key]()
92 else:
93 cache[cache_key] = retval = createfunc().freeze()
94 return retval()
95
96 @event.listens_for(maker, "do_orm_execute", retval=True)
97 def do_orm_execute(orm_context):
98 ckey = None
99 for opt in orm_context.user_defined_options:
100 ckey = opt.get_cache_key(orm_context)
101 if ckey:
102 break
103 else:
104 if "cache_key" in orm_context.execution_options:
105 ckey = orm_context.execution_options["cache_key"]
106
107 if ckey is not None:
108 return get_value(
109 ckey,
110 cache,
111 orm_context.invoke_statement,
112 )
113
114 return maker()
115
116 def test_cache_option(self):
117 User, Address = self.classes("User", "Address")

Callers 1

test_cache_optionMethod · 0.95

Calls 1

sessionmakerClass · 0.90

Tested by

no test coverage detected