MCPcopy
hub / github.com/scrapy/scrapy / get_async_iterable_with_delays

Method get_async_iterable_with_delays

tests/test_utils_defer.py:188–198  ·  tests/test_utils_defer.py::TestParallelAsync.get_async_iterable_with_delays
(length: int)

Source from the content-addressed store, hash-verified

186
187 @staticmethod
188 async def get_async_iterable_with_delays(length: int) -> AsyncGenerator[int, None]:
189 class="cm"># simulate a callback with delays between some of the results
190 from twisted.internet import reactor
191
192 for i in range(length):
193 if random.random() < 0.1:
194 dfd: Deferred[None] = Deferred()
195 delay = random.random() / 20
196 reactor.callLater(delay, dfd.callback, None)
197 await maybe_deferred_to_future(dfd)
198 yield i
199
200 @inline_callbacks_test
201 def test_simple(self):

Callers 1

test_delaysMethod · 0.95

Calls 1

maybe_deferred_to_futureFunction · 0.90

Tested by

no test coverage detected