| 2495 | |
| 2496 | @testing.combinations(True, False, argnames=class="st">"close") |
| 2497 | def test_close_parameter(self, testing_engine, close): |
| 2498 | eng = testing_engine( |
| 2499 | options=dict( |
| 2500 | pool_size=1, |
| 2501 | max_overflow=0, |
| 2502 | poolclass=( |
| 2503 | QueuePool |
| 2504 | if not testing.db.dialect.is_async |
| 2505 | else AsyncAdaptedQueuePool |
| 2506 | ), |
| 2507 | ) |
| 2508 | ) |
| 2509 | |
| 2510 | conn = eng.connect() |
| 2511 | dbapi_conn_one = conn.connection.dbapi_connection |
| 2512 | conn.close() |
| 2513 | |
| 2514 | eng_copy = copy.copy(eng) |
| 2515 | eng_copy.dispose(close=close) |
| 2516 | |
| 2517 | copy_conn = eng_copy.connect() |
| 2518 | dbapi_conn_two = copy_conn.connection.dbapi_connection |
| 2519 | |
| 2520 | is_not(dbapi_conn_one, dbapi_conn_two) |
| 2521 | |
| 2522 | conn = eng.connect() |
| 2523 | if close: |
| 2524 | is_not(dbapi_conn_one, conn.connection.dbapi_connection) |
| 2525 | else: |
| 2526 | is_(dbapi_conn_one, conn.connection.dbapi_connection) |
| 2527 | |
| 2528 | conn.close() |
| 2529 | copy_conn.close() |
| 2530 | |
| 2531 | def test_retval_flag(self): |
| 2532 | canary = [] |