(self, testing_engine, mapping_fixture)
| 2193 | |
| 2194 | @testing.requires.timing_intensive |
| 2195 | def test_lambda_concurrency(self, testing_engine, mapping_fixture): |
| 2196 | A = mapping_fixture |
| 2197 | engine = testing_engine(options={"pool_size": self.THREADS + 5}) |
| 2198 | NUM_OF_LAMBDAS = 150 |
| 2199 | |
| 2200 | code = """ |
| 2201 | from sqlalchemy import lambda_stmt, select |
| 2202 | |
| 2203 | |
| 2204 | def generate_lambda_stmt(wanted): |
| 2205 | stmt = lambda_stmt(lambda: select(A.col1, A.col2, A.col3, A.col4)) |
| 2206 | """ |
| 2207 | |
| 2208 | for _ in range(NUM_OF_LAMBDAS): |
| 2209 | code += ( |
| 2210 | " stmt += lambda s: s.where((A.col1 == wanted) & " |
| 2211 | "(A.col2 == wanted) & (A.col3 == wanted) & " |
| 2212 | "(A.col4 == wanted))\n" |
| 2213 | ) |
| 2214 | |
| 2215 | code += """ |
| 2216 | return stmt |
| 2217 | """ |
| 2218 | |
| 2219 | d = {"A": A, "__name__": "lambda_fake"} |
| 2220 | exec(code, d) |
| 2221 | generate_lambda_stmt = d["generate_lambda_stmt"] |
| 2222 | |
| 2223 | runs: List[Optional[int]] = [None for _ in range(self.THREADS)] |
| 2224 | conns = [engine.connect() for _ in range(self.THREADS)] |
| 2225 | |
| 2226 | def run(num): |
| 2227 | wanted = str(num) |
| 2228 | connection = conns[num] |
| 2229 | time.sleep(0.1) |
| 2230 | stmt = generate_lambda_stmt(wanted) |
| 2231 | time.sleep(0.1) |
| 2232 | row = connection.execute(stmt).first() |
| 2233 | if not row: |
| 2234 | runs[num] = False |
| 2235 | else: |
| 2236 | runs[num] = True |
| 2237 | |
| 2238 | threads = [ |
| 2239 | threading.Thread(target=run, args=(num,)) |
| 2240 | for num in range(self.THREADS) |
| 2241 | ] |
| 2242 | |
| 2243 | for thread in threads: |
| 2244 | thread.start() |
| 2245 | for thread in threads: |
| 2246 | thread.join(timeout=10) |
| 2247 | for conn in conns: |
| 2248 | conn.close() |
| 2249 | |
| 2250 | fails = len([r for r in runs if r is False]) |
| 2251 | assert not fails, f"{fails} runs failed" |
nothing calls this directly
no test coverage detected