Parse files using a thread pool. Also parse sequential states while waiting for the parallel results. Trees from the new parser are left in raw (serialized) form. Return (list, set) of states that were actually parsed (not cached).
(
self, sequential_states: list[State], parallel_states: list[State]
)
| 1098 | state.setup_errors() |
| 1099 | |
| 1100 | def parse_files_threaded_raw( |
| 1101 | self, sequential_states: list[State], parallel_states: list[State] |
| 1102 | ) -> tuple[list[State], set[State]]: |
| 1103 | """Parse files using a thread pool. |
| 1104 | |
| 1105 | Also parse sequential states while waiting for the parallel results. |
| 1106 | Trees from the new parser are left in raw (serialized) form. |
| 1107 | |
| 1108 | Return (list, set) of states that were actually parsed (not cached). |
| 1109 | """ |
| 1110 | futures = [] |
| 1111 | # Use both list and a set to have more predictable order of errors, |
| 1112 | # while also not sacrificing performance. |
| 1113 | parallel_parsed_states: list[State] = [] |
| 1114 | parallel_parsed_states_set: set[State] = set() |
| 1115 | # Use at least --num-workers if specified by user. |
| 1116 | available_threads = max(get_available_threads(), self.options.num_workers) |
| 1117 | # Overhead from trying to parallelize (small) blocking portion of |
| 1118 | # parse_file_inner() results in no visible improvement with more than 8 threads. |
| 1119 | # TODO: reuse thread pool and/or batch small files in single submit() call. |
| 1120 | with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor: |
| 1121 | for state in parallel_states: |
| 1122 | state.needs_parse = False |
| 1123 | if state.id not in self.ast_cache: |
| 1124 | self.log(f"Parsing {state.xpath} ({state.id})") |
| 1125 | ignore_errors = state.ignore_all or state.options.ignore_errors |
| 1126 | if ignore_errors: |
| 1127 | self.errors.ignored_files.add(state.xpath) |
| 1128 | futures.append(executor.submit(state.parse_file_inner, "")) |
| 1129 | parallel_parsed_states.append(state) |
| 1130 | parallel_parsed_states_set.add(state) |
| 1131 | else: |
| 1132 | self.log(f"Using cached AST for {state.xpath} ({state.id})") |
| 1133 | state.tree, state.early_errors, source_hash = self.ast_cache[state.id] |
| 1134 | state.source_hash = source_hash |
| 1135 | |
| 1136 | # Parse sequential before waiting on parallel. |
| 1137 | for state in sequential_states: |
| 1138 | state.parse_file() |
| 1139 | |
| 1140 | for fut in wait(futures).done: |
| 1141 | fut.result() |
| 1142 | |
| 1143 | return parallel_parsed_states, parallel_parsed_states_set |
| 1144 | |
| 1145 | def post_parse_all(self, states: list[State]) -> None: |
| 1146 | for state in states: |
no test coverage detected