MCPcopy
hub / github.com/scrapy/scrapy / _parallel_asyncio

Function _parallel_asyncio

scrapy/utils/asyncio.py:95–130  ·  view source on GitHub ↗

Execute a callable over the objects in the given iterable, in parallel, using no more than ``count`` concurrent calls. This function is only used in :meth:`scrapy.core.scraper.Scraper.handle_spider_output_async` and so it assumes that neither *callable* nor iterating *iterable* will

(
    iterable: Iterable[_T] | AsyncIterator[_T],
    count: int,
    callable_: Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]],
    *args: _P.args,
    **kwargs: _P.kwargs,
)

Source from the content-addressed store, hash-verified

93
94
95async def _parallel_asyncio(
96 iterable: Iterable[_T] | AsyncIterator[_T],
97 count: int,
98 callable_: Callable[Concatenate[_T, _P], Coroutine[Any, Any, None]],
99 *args: _P.args,
100 **kwargs: _P.kwargs,
101) -> None:
102 """Execute a callable over the objects in the given iterable, in parallel,
103 using no more than ``count`` concurrent calls.
104
105 This function is only used in
106 :meth:`scrapy.core.scraper.Scraper.handle_spider_output_async` and so it
107 assumes that neither *callable* nor iterating *iterable* will raise an
108 exception.
109 """
110 queue: asyncio.Queue[_T | None] = asyncio.Queue(count * 2)
111
112 async def worker() -> None:
113 while True:
114 item = await queue.get()
115 if item is None:
116 break
117 try:
118 await callable_(item, *args, **kwargs)
119 finally:
120 queue.task_done()
121
122 async def fill_queue() -> None:
123 async for item in as_async_generator(iterable):
124 await queue.put(item)
125 for _ in range(count):
126 await queue.put(None)
127
128 fill_task = asyncio.create_task(fill_queue())
129 work_tasks = [asyncio.create_task(worker()) for _ in range(count)]
130 await asyncio.wait([fill_task, *work_tasks])
131
132
133class AsyncioLoopingCall:

Callers 3

test_simpleMethod · 0.90
test_delaysMethod · 0.90

Calls 3

fill_queueFunction · 0.85
workerFunction · 0.85
waitMethod · 0.80

Tested by 2

test_simpleMethod · 0.72
test_delaysMethod · 0.72