| 526 | |
| 527 | @staticmethod |
| 528 | def _handle_tasks(taskqueue, put, outqueue, pool, cache): |
| 529 | thread = threading.current_thread() |
| 530 | |
| 531 | for taskseq, set_length in iter(taskqueue.get, None): |
| 532 | task = None |
| 533 | try: |
| 534 | # iterating taskseq cannot fail |
| 535 | for task in taskseq: |
| 536 | if thread._state != RUN: |
| 537 | util.debug('task handler found thread._state != RUN') |
| 538 | break |
| 539 | try: |
| 540 | put(task) |
| 541 | except Exception as e: |
| 542 | job, idx = task[:2] |
| 543 | try: |
| 544 | cache[job]._set(idx, (False, e)) |
| 545 | except KeyError: |
| 546 | pass |
| 547 | else: |
| 548 | if set_length: |
| 549 | util.debug('doing set_length()') |
| 550 | idx = task[1] if task else -1 |
| 551 | set_length(idx + 1) |
| 552 | continue |
| 553 | break |
| 554 | finally: |
| 555 | task = taskseq = job = None |
| 556 | else: |
| 557 | util.debug('task handler got sentinel') |
| 558 | |
| 559 | try: |
| 560 | # tell result handler to finish when cache is empty |
| 561 | util.debug('task handler sending sentinel to result handler') |
| 562 | outqueue.put(None) |
| 563 | |
| 564 | # tell workers there is no more work |
| 565 | util.debug('task handler sending sentinel to workers') |
| 566 | for p in pool: |
| 567 | put(None) |
| 568 | except OSError: |
| 569 | util.debug('task handler got OSError when sending sentinels') |
| 570 | |
| 571 | util.debug('task handler exiting') |
| 572 | |
| 573 | @staticmethod |
| 574 | def _handle_results(outqueue, get, cache): |