(index_filename: str, class_names: list[str], dry_run: bool)
| 218 | |
| 219 | |
| 220 | def main(index_filename: str, class_names: list[str], dry_run: bool): |
| 221 | if len(class_names) > 1: |
| 222 | print(f"Sorting {len(class_names)} Python files") |
| 223 | |
| 224 | with open(index_filename) as r: |
| 225 | index = json.load(r) |
| 226 | |
| 227 | filenames = {} |
| 228 | for class_name in class_names: |
| 229 | full_class_name = class_name |
| 230 | if "." not in class_name: |
| 231 | cls = index.get("classes", {}).get(class_name) |
| 232 | if not cls: |
| 233 | raise ValueError(f"Class {class_name} does not exist in index") |
| 234 | full_class_name = f'{cls.get("package")}.{cls.get("module")}.{cls.get("name")}' |
| 235 | package, module, class_name = full_class_name.split(".", maxsplit=2) |
| 236 | filename = f"{package}/{module}.py" |
| 237 | filenames[class_name] = filename |
| 238 | |
| 239 | with multiprocessing.Manager() as manager: |
| 240 | locks = {filename: manager.Lock() for filename in filenames} |
| 241 | locks.update({"stdout": manager.Lock()}) |
| 242 | |
| 243 | # sort files in parallel |
| 244 | worker = SortFileWorker(dry_run, locks) |
| 245 | with multiprocessing.Pool() as pool: |
| 246 | pool.map(worker.sort, iterable=filenames.items()) |
| 247 | |
| 248 | |
| 249 | def parse_args(): |
no test coverage detected
searching dependent graphs…