MCPcopy
hub / github.com/django/django / _clone_db

Method _clone_db

django/db/backends/mysql/creation.py:62–100  ·  view source on GitHub ↗
(self, source_database_name, target_database_name)

Source from the content-addressed store, hash-verified

60 self._clone_db(source_database_name, target_database_name)
61
62 def _clone_db(self, source_database_name, target_database_name):
63 cmd_args, cmd_env = DatabaseClient.settings_to_cmd_args_env(
64 self.connection.settings_dict, []
65 )
66 dump_cmd = [
67 "mysqldump",
68 *cmd_args[1:-1],
69 "--routines",
70 "--events",
71 source_database_name,
72 ]
73 dump_env = load_env = {**os.environ, **cmd_env} if cmd_env else None
74 load_cmd = cmd_args
75 load_cmd[-1] = target_database_name
76
77 with (
78 subprocess.Popen(
79 dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=dump_env
80 ) as dump_proc,
81 subprocess.Popen(
82 load_cmd,
83 stdin=dump_proc.stdout,
84 stdout=subprocess.DEVNULL,
85 stderr=subprocess.PIPE,
86 env=load_env,
87 ) as load_proc,
88 ):
89 # Allow dump_proc to receive a SIGPIPE if the load process exits.
90 dump_proc.stdout.close()
91 dump_err = dump_proc.stderr.read().decode(errors="replace")
92 load_err = load_proc.stderr.read().decode(errors="replace")
93 if dump_proc.returncode != 0:
94 self.log(
95 f"Got an error on mysqldump when cloning the test database: {dump_err}"
96 )
97 sys.exit(dump_proc.returncode)
98 if load_proc.returncode != 0:
99 self.log(f"Got an error cloning the test database: {load_err}")
100 sys.exit(load_proc.returncode)

Calls 5

closeMethod · 0.45
decodeMethod · 0.45
readMethod · 0.45
logMethod · 0.45