(dialect, conn_rec, cargs, cparams)
| 159 | |
| 160 | |
| 161 | def _connect_with_retry(dialect, conn_rec, cargs, cparams): |
| 162 | assert dialect.driver == "cx_oracle" |
| 163 | |
| 164 | def _is_couldnt_connect(err): |
| 165 | return "DPY-6005" in str(err) or "ORA-12516" in str(err) |
| 166 | |
| 167 | err_ = None |
| 168 | for _ in range(5): |
| 169 | try: |
| 170 | return dialect.loaded_dbapi.connect(*cargs, **cparams) |
| 171 | except ( |
| 172 | dialect.loaded_dbapi.DatabaseError, |
| 173 | dialect.loaded_dbapi.OperationalError, |
| 174 | ) as err: |
| 175 | err_ = err |
| 176 | if _is_couldnt_connect(err): |
| 177 | warn_test_suite("Oracle database reconnecting...") |
| 178 | time.sleep(2) |
| 179 | continue |
| 180 | else: |
| 181 | raise |
| 182 | if err_ is not None: |
| 183 | raise Exception("connect failed after five attempts") from err_ |
| 184 | |
| 185 | |
| 186 | @post_configure_testing_engine.for_db("oracle") |
nothing calls this directly
no test coverage detected