Distribute TestCases across workers. Return an identifier of each TestCase with its result in order to use imap_unordered to show results as soon as they're available. To minimize pickling errors when getting results from workers: - pass back numeric index
(self, result)
| 539 | super().__init__() |
| 540 | |
| 541 | def run(self, result): |
| 542 | """ |
| 543 | Distribute TestCases across workers. |
| 544 | |
| 545 | Return an identifier of each TestCase with its result in order to use |
| 546 | imap_unordered to show results as soon as they're available. |
| 547 | |
| 548 | To minimize pickling errors when getting results from workers: |
| 549 | |
| 550 | - pass back numeric indexes in self.subsuites instead of tests |
| 551 | - make tracebacks picklable with tblib, if available |
| 552 | |
| 553 | Even with tblib, errors may still occur for dynamically created |
| 554 | exception classes which cannot be unpickled. |
| 555 | """ |
| 556 | self.initialize_suite() |
| 557 | counter = multiprocessing.Value(ctypes.c_int, 0) |
| 558 | args = [ |
| 559 | (self.runner_class, index, subsuite, self.failfast, self.buffer) |
| 560 | for index, subsuite in enumerate(self.subsuites) |
| 561 | ] |
| 562 | # Don't buffer in the main process to avoid error propagation issues. |
| 563 | result.buffer = False |
| 564 | |
| 565 | with multiprocessing.Pool( |
| 566 | processes=self.processes, |
| 567 | initializer=functools.partial(_safe_init_worker, self.init_worker.__func__), |
| 568 | initargs=[ |
| 569 | counter, |
| 570 | self.initial_settings, |
| 571 | self.serialized_contents, |
| 572 | self.process_setup.__func__, |
| 573 | self.process_setup_args, |
| 574 | self.debug_mode, |
| 575 | self.used_aliases, |
| 576 | ], |
| 577 | ) as pool: |
| 578 | test_results = pool.imap_unordered(self.run_subsuite.__func__, args) |
| 579 | |
| 580 | while True: |
| 581 | if result.shouldStop: |
| 582 | pool.terminate() |
| 583 | break |
| 584 | |
| 585 | try: |
| 586 | subsuite_index, events = test_results.next(timeout=0.1) |
| 587 | except multiprocessing.TimeoutError as err: |
| 588 | if counter.value < 0: |
| 589 | err.add_note("ERROR: _init_worker failed, see prior traceback") |
| 590 | raise |
| 591 | continue |
| 592 | except StopIteration: |
| 593 | pool.close() |
| 594 | break |
| 595 | |
| 596 | tests = list(self.subsuites[subsuite_index]) |
| 597 | for event in events: |
| 598 | self.handle_event(result, tests, event) |