(self, suffix, verbosity, keepdb=False)
| 72 | ) |
| 73 | |
| 74 | def _clone_test_db(self, suffix, verbosity, keepdb=False): |
| 75 | source_database_name = self.connection.settings_dict["NAME"] |
| 76 | target_database_name = self.get_test_db_clone_settings(suffix)["NAME"] |
| 77 | if not self.is_in_memory_db(source_database_name): |
| 78 | # Erase the old test database |
| 79 | if os.access(target_database_name, os.F_OK): |
| 80 | if keepdb: |
| 81 | return |
| 82 | if verbosity >= 1: |
| 83 | self.log( |
| 84 | "Destroying old test database for alias %s..." |
| 85 | % ( |
| 86 | self._get_database_display_str( |
| 87 | verbosity, target_database_name |
| 88 | ), |
| 89 | ) |
| 90 | ) |
| 91 | try: |
| 92 | os.remove(target_database_name) |
| 93 | except Exception as e: |
| 94 | self.log("Got an error deleting the old test database: %s" % e) |
| 95 | sys.exit(2) |
| 96 | try: |
| 97 | shutil.copy(source_database_name, target_database_name) |
| 98 | except Exception as e: |
| 99 | self.log("Got an error cloning the test database: %s" % e) |
| 100 | sys.exit(2) |
| 101 | # Forking automatically makes a copy of an in-memory database. |
| 102 | # Forkserver and spawn require migrating to disk which will be |
| 103 | # re-opened in setup_worker_connection. |
| 104 | elif multiprocessing.get_start_method() in {"forkserver", "spawn"}: |
| 105 | ondisk_db = sqlite3.connect(target_database_name, uri=True) |
| 106 | self.connection.connection.backup(ondisk_db) |
| 107 | ondisk_db.close() |
| 108 | |
| 109 | def _destroy_test_db(self, test_database_name, verbosity): |
| 110 | if test_database_name and not self.is_in_memory_db(test_database_name): |
nothing calls this directly
no test coverage detected