Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. This function is only used in :meth:`scrapy.core.scraper.Scraper.handle_spider_output_async` and so it assumes that neither *callable* nor iterating *iterable* will
(
iterable: Iterable[_T] | AsyncIterator[_T],
count: int,
callable_: Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]],
*args: _P.args,
**kwargs: _P.kwargs,
)
| 93 | |
| 94 | |
| 95 | async def _parallel_asyncio( |
| 96 | iterable: Iterable[_T] | AsyncIterator[_T], |
| 97 | count: int, |
| 98 | callable_: Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]], |
| 99 | *args: _P.args, |
| 100 | **kwargs: _P.kwargs, |
| 101 | ) -> None: |
| 102 | """Execute a callable over the objects in the given iterable, in parallel, |
| 103 | using no more than ``count`` concurrent calls. |
| 104 | |
| 105 | This function is only used in |
| 106 | :meth:`scrapy.core.scraper.Scraper.handle_spider_output_async` and so it |
| 107 | assumes that neither *callable* nor iterating *iterable* will raise an |
| 108 | exception. |
| 109 | """ |
| 110 | queue: asyncio.Queue[_T | None] = asyncio.Queue(count * 2) |
| 111 | |
| 112 | async def worker() -> None: |
| 113 | while True: |
| 114 | item = await queue.get() |
| 115 | if item is None: |
| 116 | break |
| 117 | try: |
| 118 | await callable_(item, *args, **kwargs) |
| 119 | finally: |
| 120 | queue.task_done() |
| 121 | |
| 122 | async def fill_queue() -> None: |
| 123 | async for item in as_async_generator(iterable): |
| 124 | await queue.put(item) |
| 125 | for _ in range(count): |
| 126 | await queue.put(None) |
| 127 | |
| 128 | fill_task = asyncio.create_task(fill_queue()) |
| 129 | work_tasks = [asyncio.create_task(worker()) for _ in range(count)] |
| 130 | await asyncio.wait([fill_task, *work_tasks]) |
| 131 | |
| 132 | |
| 133 | class AsyncioLoopingCall: |