| 60 | self._clone_db(source_database_name, target_database_name) |
| 61 | |
| 62 | def _clone_db(self, source_database_name, target_database_name): |
| 63 | cmd_args, cmd_env = DatabaseClient.settings_to_cmd_args_env( |
| 64 | self.connection.settings_dict, [] |
| 65 | ) |
| 66 | dump_cmd = [ |
| 67 | "mysqldump", |
| 68 | *cmd_args[1:-1], |
| 69 | "--routines", |
| 70 | "--events", |
| 71 | source_database_name, |
| 72 | ] |
| 73 | dump_env = load_env = {**os.environ, **cmd_env} if cmd_env else None |
| 74 | load_cmd = cmd_args |
| 75 | load_cmd[-1] = target_database_name |
| 76 | |
| 77 | with ( |
| 78 | subprocess.Popen( |
| 79 | dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=dump_env |
| 80 | ) as dump_proc, |
| 81 | subprocess.Popen( |
| 82 | load_cmd, |
| 83 | stdin=dump_proc.stdout, |
| 84 | stdout=subprocess.DEVNULL, |
| 85 | stderr=subprocess.PIPE, |
| 86 | env=load_env, |
| 87 | ) as load_proc, |
| 88 | ): |
| 89 | # Allow dump_proc to receive a SIGPIPE if the load process exits. |
| 90 | dump_proc.stdout.close() |
| 91 | dump_err = dump_proc.stderr.read().decode(errors="replace") |
| 92 | load_err = load_proc.stderr.read().decode(errors="replace") |
| 93 | if dump_proc.returncode != 0: |
| 94 | self.log( |
| 95 | f"Got an error on mysqldump when cloning the test database: {dump_err}" |
| 96 | ) |
| 97 | sys.exit(dump_proc.returncode) |
| 98 | if load_proc.returncode != 0: |
| 99 | self.log(f"Got an error cloning the test database: {load_err}") |
| 100 | sys.exit(load_proc.returncode) |