Process a module from the list of changed modules. Returns: Tuple with these items: - Updated list of pending changed modules as (module id, path) tuples - Module which was actually processed as (id, path) tuple - If there was a blocking erro
(
self,
changed_modules: list[tuple[str, str]],
initial_set: set[str],
removed_set: set[str],
blocking_error: str | None,
followed: bool,
)
| 333 | self.manager.ast_cache.clear() |
| 334 | |
| 335 | def update_one( |
| 336 | self, |
| 337 | changed_modules: list[tuple[str, str]], |
| 338 | initial_set: set[str], |
| 339 | removed_set: set[str], |
| 340 | blocking_error: str | None, |
| 341 | followed: bool, |
| 342 | ) -> tuple[list[tuple[str, str]], tuple[str, str], list[str] | None]: |
| 343 | """Process a module from the list of changed modules. |
| 344 | |
| 345 | Returns: |
| 346 | Tuple with these items: |
| 347 | |
| 348 | - Updated list of pending changed modules as (module id, path) tuples |
| 349 | - Module which was actually processed as (id, path) tuple |
| 350 | - If there was a blocking error, the error messages from it |
| 351 | """ |
| 352 | t0 = time.time() |
| 353 | next_id, next_path = changed_modules.pop(0) |
| 354 | |
| 355 | # If we have a module with a blocking error that is no longer |
| 356 | # in the import graph, we must skip it as otherwise we'll be |
| 357 | # stuck with the blocking error. |
| 358 | if ( |
| 359 | next_id == blocking_error |
| 360 | and next_id not in self.previous_modules |
| 361 | and next_id not in initial_set |
| 362 | ): |
| 363 | self.manager.log_fine_grained( |
| 364 | f"skip {next_id!r} (module with blocking error not in import graph)" |
| 365 | ) |
| 366 | return changed_modules, (next_id, next_path), None |
| 367 | |
| 368 | result = self.update_module(next_id, next_path, next_id in removed_set, followed) |
| 369 | remaining, (next_id, next_path), blocker_messages = result |
| 370 | changed_modules = [(id, path) for id, path in changed_modules if id != next_id] |
| 371 | changed_modules = dedupe_modules(remaining + changed_modules) |
| 372 | t1 = time.time() |
| 373 | |
| 374 | self.manager.log_fine_grained( |
| 375 | f"update once: {next_id} in {t1 - t0:.3f}s - {len(changed_modules)} left" |
| 376 | ) |
| 377 | |
| 378 | return changed_modules, (next_id, next_path), blocker_messages |
| 379 | |
| 380 | def update_module( |
| 381 | self, module: str, path: str, force_removed: bool, followed: bool |
no test coverage detected