MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_lambda_concurrency

Method test_lambda_concurrency

test/sql/test_lambdas.py:2195–2251  ·  view source on GitHub ↗
(self, testing_engine, mapping_fixture)

Source from the content-addressed store, hash-verified

2193
2194 @testing.requires.timing_intensive
2195 def test_lambda_concurrency(self, testing_engine, mapping_fixture):
2196 A = mapping_fixture
2197 engine = testing_engine(options={"pool_size": self.THREADS + 5})
2198 NUM_OF_LAMBDAS = 150
2199
2200 code = """
2201from sqlalchemy import lambda_stmt, select
2202
2203
2204def generate_lambda_stmt(wanted):
2205 stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4))
2206"""
2207
2208 for _ in range(NUM_OF_LAMBDAS):
2209 code += (
2210 " stmt += lambda s: s.where((A.col1 == wanted) & "
2211 "(A.col2 == wanted) & (A.col3 == wanted) & "
2212 "(A.col4 == wanted))\n"
2213 )
2214
2215 code += """
2216 return stmt
2217"""
2218
2219 d = {"A": A, "__name__": "lambda_fake"}
2220 exec(code, d)
2221 generate_lambda_stmt = d["generate_lambda_stmt"]
2222
2223 runs: List[Optional[int]] = [None for _ in range(self.THREADS)]
2224 conns = [engine.connect() for _ in range(self.THREADS)]
2225
2226 def run(num):
2227 wanted = str(num)
2228 connection = conns[num]
2229 time.sleep(0.1)
2230 stmt = generate_lambda_stmt(wanted)
2231 time.sleep(0.1)
2232 row = connection.execute(stmt).first()
2233 if not row:
2234 runs[num] = False
2235 else:
2236 runs[num] = True
2237
2238 threads = [
2239 threading.Thread(target=run, args=(num,))
2240 for num in range(self.THREADS)
2241 ]
2242
2243 for thread in threads:
2244 thread.start()
2245 for thread in threads:
2246 thread.join(timeout=10)
2247 for conn in conns:
2248 conn.close()
2249
2250 fails = len([r for r in runs if r is False])
2251 assert not fails, f"{fails} runs failed"

Callers

nothing calls this directly

Calls 5

testing_engineFunction · 0.85
connectMethod · 0.45
startMethod · 0.45
joinMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected