MCPcopy
hub / github.com/scrapy/scrapy / _AsyncCooperatorAdapter

Class _AsyncCooperatorAdapter

scrapy/utils/defer.py:176–277  ·  view source on GitHub ↗

A class that wraps an async iterable into a normal iterator suitable for using in Cooperator.coiterate(). As it's only needed for parallel_async(), it calls the callable directly in the callback, instead of providing a more generic interface. On the outside, this class behaves as an

Source from the content-addressed store, hash-verified

174
175
176class _AsyncCooperatorAdapter(Iterator[Deferred[Any]], Generic[_T]):
177 """A class that wraps an async iterable into a normal iterator suitable
178 for using in Cooperator.coiterate(). As it's only needed for parallel_async(),
179 it calls the callable directly in the callback, instead of providing a more
180 generic interface.
181
182 On the outside, this class behaves as an iterator that yields Deferreds.
183 Each Deferred is fired with the result of the callable which was called on
184 the next result from aiterator. It raises StopIteration when aiterator is
185 exhausted, as expected.
186
187 Cooperator calls __next__() multiple times and waits on the Deferreds
188 returned from it. As async generators (since Python 3.8) don't support
189 awaiting on __anext__() several times in parallel, we need to serialize
190 this. It's done by storing the Deferreds returned from __next__() and
191 firing the oldest one when a result from __anext__() is available.
192
193 The workflow:
194 1. When __next__() is called for the first time, it creates a Deferred, stores it
195 in self.waiting_deferreds and returns it. It also makes a Deferred that will wait
196 for self.aiterator.__anext__() and puts it into self.anext_deferred.
197 2. If __next__() is called again before self.anext_deferred fires, more Deferreds
198 are added to self.waiting_deferreds.
199 3. When self.anext_deferred fires, it either calls _callback() or _errback(). Both
200 clear self.anext_deferred.
201 3.1. _callback() calls the callable passing the result value that it takes, pops a
202 Deferred from self.waiting_deferreds, and if the callable result was a Deferred, it
203 chains those Deferreds so that the waiting Deferred will fire when the result
204 Deferred does, otherwise it fires it directly. This causes one awaiting task to
205 receive a result. If self.waiting_deferreds is still not empty, new __anext__() is
206 called and self.anext_deferred is populated.
207 3.2. _errback() checks the exception class. If it's StopAsyncIteration it means
208 self.aiterator is exhausted and so it sets self.finished and fires all
209 self.waiting_deferreds. Other exceptions are propagated.
210 4. If __next__() is called after __anext__() was handled, then if self.finished is
211 True, it raises StopIteration, otherwise it acts like in step 2, but if
212 self.anext_deferred is now empty is also populates it with a new __anext__().
213
214 Note that CooperativeTask ignores the value returned from the Deferred that it waits
215 for, so we fire them with None when needed.
216
217 It may be possible to write an async iterator-aware replacement for
218 Cooperator/CooperativeTask and use it instead of this adapter to achieve the same
219 goal.
220 """
221
222 def __init__(
223 self,
224 aiterable: AsyncIterator[_T],
225 callable_: Callable[Concatenate[_T, _P], Deferred[Any] | None],
226 *callable_args: _P.args,
227 **callable_kwargs: _P.kwargs,
228 ):
229 self.aiterator: AsyncIterator[_T] = aiterable.__aiter__()
230 self.callable: Callable[Concatenate[_T, _P], Deferred[Any] | None] = callable_
231 self.callable_args: tuple[Any, ...] = callable_args
232 self.callable_kwargs: dict[str, Any] = callable_kwargs
233 self.finished: bool = False

Callers 1

parallel_asyncFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected