Helper function to implement map, starmap and their async counterparts.
(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None)
| 469 | error_callback) |
| 470 | |
| 471 | def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, |
| 472 | error_callback=None): |
| 473 | ''' |
| 474 | Helper function to implement map, starmap and their async counterparts. |
| 475 | ''' |
| 476 | self._check_running() |
| 477 | if not hasattr(iterable, '__len__'): |
| 478 | iterable = list(iterable) |
| 479 | |
| 480 | if chunksize is None: |
| 481 | chunksize, extra = divmod(len(iterable), len(self._pool) * 4) |
| 482 | if extra: |
| 483 | chunksize += 1 |
| 484 | if len(iterable) == 0: |
| 485 | chunksize = 0 |
| 486 | |
| 487 | task_batches = Pool._get_tasks(func, iterable, chunksize) |
| 488 | result = MapResult(self, chunksize, len(iterable), callback, |
| 489 | error_callback=error_callback) |
| 490 | self._taskqueue.put( |
| 491 | ( |
| 492 | self._guarded_task_generation(result._job, |
| 493 | mapper, |
| 494 | task_batches), |
| 495 | None |
| 496 | ) |
| 497 | ) |
| 498 | return result |
| 499 | |
| 500 | @staticmethod |
| 501 | def _wait_for_updates(sentinels, change_notifier, timeout=None): |
no test coverage detected