Format file under `src` path. Return True if changed. If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted code to the file. `mode` and `fast` options are passed to :func:`format_file_contents`.
(
src: Path,
fast: bool,
mode: Mode,
write_back: WriteBack = WriteBack.NO,
lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy
*,
lines: Collection[tuple[int, int]] = (),
)
| 953 | |
| 954 | |
| 955 | def format_file_in_place( |
| 956 | src: Path, |
| 957 | fast: bool, |
| 958 | mode: Mode, |
| 959 | write_back: WriteBack = WriteBack.NO, |
| 960 | lock: Any = None, # multiprocessing.Manager().Lock() is some crazy proxy |
| 961 | *, |
| 962 | lines: Collection[tuple[int, int]] = (), |
| 963 | ) -> bool: |
| 964 | """Format file under `src` path. Return True if changed. |
| 965 | |
| 966 | If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted |
| 967 | code to the file. |
| 968 | `mode` and `fast` options are passed to :func:`format_file_contents`. |
| 969 | """ |
| 970 | if src.suffix == ".pyi": |
| 971 | mode = replace(mode, is_pyi=True) |
| 972 | elif src.suffix == ".ipynb": |
| 973 | mode = replace(mode, is_ipynb=True) |
| 974 | |
| 975 | then = datetime.fromtimestamp(src.stat().st_mtime, timezone.utc) |
| 976 | header = b"" |
| 977 | with open(src, "rb") as buf: |
| 978 | if mode.skip_source_first_line: |
| 979 | header = buf.readline() |
| 980 | src_contents, encoding, newline = decode_bytes(buf.read(), mode) |
| 981 | try: |
| 982 | dst_contents = format_file_contents( |
| 983 | src_contents, fast=fast, mode=mode, lines=lines |
| 984 | ) |
| 985 | except NothingChanged: |
| 986 | return False |
| 987 | except JSONDecodeError: |
| 988 | raise ValueError( |
| 989 | f"File '{src}' cannot be parsed as valid Jupyter notebook." |
| 990 | ) from None |
| 991 | src_contents = header.decode(encoding) + src_contents |
| 992 | dst_contents = header.decode(encoding) + dst_contents |
| 993 | |
| 994 | if write_back == WriteBack.YES: |
| 995 | with open(src, "w", encoding=encoding, newline=newline) as f: |
| 996 | f.write(dst_contents) |
| 997 | elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| 998 | now = datetime.now(timezone.utc) |
| 999 | src_name = f"{src}\t{then}" |
| 1000 | dst_name = f"{src}\t{now}" |
| 1001 | if mode.is_ipynb: |
| 1002 | diff_contents = ipynb_diff(src_contents, dst_contents, src_name, dst_name) |
| 1003 | else: |
| 1004 | diff_contents = diff(src_contents, dst_contents, src_name, dst_name) |
| 1005 | |
| 1006 | if write_back == WriteBack.COLOR_DIFF: |
| 1007 | diff_contents = color_diff(diff_contents) |
| 1008 | |
| 1009 | with lock or nullcontext(): |
| 1010 | f = io.TextIOWrapper( |
| 1011 | sys.stdout.buffer, |
| 1012 | encoding=encoding, |