(self, src_path, dest_path, compression)
| 285 | _extension_to_compression = icom.extension_to_compression |
| 286 | |
| 287 | def compress_file(self, src_path, dest_path, compression): |
| 288 | if compression is None: |
| 289 | shutil.copyfile(src_path, dest_path) |
| 290 | return |
| 291 | |
| 292 | if compression == "gzip": |
| 293 | f = gzip.open(dest_path, "w") |
| 294 | elif compression == "bz2": |
| 295 | f = bz2.BZ2File(dest_path, "w") |
| 296 | elif compression == "zip": |
| 297 | with zipfile.ZipFile(dest_path, "w", compression=zipfile.ZIP_DEFLATED) as f: |
| 298 | f.write(src_path, os.path.basename(src_path)) |
| 299 | elif compression == "tar": |
| 300 | with open(src_path, "rb") as fh: |
| 301 | with tarfile.open(dest_path, mode="w") as tar: |
| 302 | tarinfo = tar.gettarinfo(src_path, os.path.basename(src_path)) |
| 303 | tar.addfile(tarinfo, fh) |
| 304 | elif compression == "xz": |
| 305 | import lzma |
| 306 | |
| 307 | f = lzma.LZMAFile(dest_path, "w") |
| 308 | elif compression == "zstd": |
| 309 | f = import_optional_dependency("zstandard").open(dest_path, "wb") |
| 310 | else: |
| 311 | msg = f"Unrecognized compression type: {compression}" |
| 312 | raise ValueError(msg) |
| 313 | |
| 314 | if compression not in ["zip", "tar"]: |
| 315 | with open(src_path, "rb") as fh: |
| 316 | with f: |
| 317 | f.write(fh.read()) |
| 318 | |
| 319 | def test_write_explicit(self, compression, get_random_path, temp_file): |
| 320 | p1 = temp_file.parent / f"{temp_file.stem}.compressed" |
no test coverage detected