(cfg, eng, ident)
| 61 | |
| 62 | @drop_db.for_db("postgresql") |
| 63 | def _pg_drop_db(cfg, eng, ident): |
| 64 | with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: |
| 65 | with conn.begin(): |
| 66 | conn.execute( |
| 67 | text( |
| 68 | "select pg_terminate_backend(pid) from pg_stat_activity " |
| 69 | "where usename=current_user and pid != pg_backend_pid() " |
| 70 | "and datname=:dname" |
| 71 | ), |
| 72 | dict(dname=ident), |
| 73 | ) |
| 74 | conn.exec_driver_sql("DROP DATABASE %s" % ident) |
| 75 | |
| 76 | |
| 77 | @temp_table_keyword_args.for_db("postgresql") |
nothing calls this directly
no test coverage detected