A class that wraps an async iterable into a normal iterator suitable for using in Cooperator.coiterate(). As it's only needed for parallel_async(), it calls the callable directly in the callback, instead of providing a more generic interface. On the outside, this class behaves as an
| 174 | |
| 175 | |
| 176 | class _AsyncCooperatorAdapter(Iterator[Deferred[Any]], Generic[_T]): |
| 177 | """A class that wraps an async iterable into a normal iterator suitable |
| 178 | for using in Cooperator.coiterate(). As it's only needed for parallel_async(), |
| 179 | it calls the callable directly in the callback, instead of providing a more |
| 180 | generic interface. |
| 181 | |
| 182 | On the outside, this class behaves as an iterator that yields Deferreds. |
| 183 | Each Deferred is fired with the result of the callable which was called on |
| 184 | the next result from aiterator. It raises StopIteration when aiterator is |
| 185 | exhausted, as expected. |
| 186 | |
| 187 | Cooperator calls __next__() multiple times and waits on the Deferreds |
| 188 | returned from it. As async generators (since Python 3.8) don't support |
| 189 | awaiting on __anext__() several times in parallel, we need to serialize |
| 190 | this. It's done by storing the Deferreds returned from __next__() and |
| 191 | firing the oldest one when a result from __anext__() is available. |
| 192 | |
| 193 | The workflow: |
| 194 | 1. When __next__() is called for the first time, it creates a Deferred, stores it |
| 195 | in self.waiting_deferreds and returns it. It also makes a Deferred that will wait |
| 196 | for self.aiterator.__anext__() and puts it into self.anext_deferred. |
| 197 | 2. If __next__() is called again before self.anext_deferred fires, more Deferreds |
| 198 | are added to self.waiting_deferreds. |
| 199 | 3. When self.anext_deferred fires, it either calls _callback() or _errback(). Both |
| 200 | clear self.anext_deferred. |
| 201 | 3.1. _callback() calls the callable passing the result value that it takes, pops a |
| 202 | Deferred from self.waiting_deferreds, and if the callable result was a Deferred, it |
| 203 | chains those Deferreds so that the waiting Deferred will fire when the result |
| 204 | Deferred does, otherwise it fires it directly. This causes one awaiting task to |
| 205 | receive a result. If self.waiting_deferreds is still not empty, new __anext__() is |
| 206 | called and self.anext_deferred is populated. |
| 207 | 3.2. _errback() checks the exception class. If it's StopAsyncIteration it means |
| 208 | self.aiterator is exhausted and so it sets self.finished and fires all |
| 209 | self.waiting_deferreds. Other exceptions are propagated. |
| 210 | 4. If __next__() is called after __anext__() was handled, then if self.finished is |
| 211 | True, it raises StopIteration, otherwise it acts like in step 2, but if |
| 212 | self.anext_deferred is now empty is also populates it with a new __anext__(). |
| 213 | |
| 214 | Note that CooperativeTask ignores the value returned from the Deferred that it waits |
| 215 | for, so we fire them with None when needed. |
| 216 | |
| 217 | It may be possible to write an async iterator-aware replacement for |
| 218 | Cooperator/CooperativeTask and use it instead of this adapter to achieve the same |
| 219 | goal. |
| 220 | """ |
| 221 | |
| 222 | def __init__( |
| 223 | self, |
| 224 | aiterable: AsyncIterator[_T], |
| 225 | callable_: Callable[Concatenate[_T, _P], Deferred[Any] | None], |
| 226 | *callable_args: _P.args, |
| 227 | **callable_kwargs: _P.kwargs, |
| 228 | ): |
| 229 | self.aiterator: AsyncIterator[_T] = aiterable.__aiter__() |
| 230 | self.callable: Callable[Concatenate[_T, _P], Deferred[Any] | None] = callable_ |
| 231 | self.callable_args: tuple[Any, ...] = callable_args |
| 232 | self.callable_kwargs: dict[str, Any] = callable_kwargs |
| 233 | self.finished: bool = False |