MCPcopy
hub / github.com/scrapy/scrapy / test_delays

Method test_delays

tests/test_utils_defer.py:221–238  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

219
220 @inline_callbacks_test
221 def test_delays(self):
222 for length in [20, 50, 100]:
223 parallel_count = [0]
224 max_parallel_count = [0]
225 results = []
226 ait = self.get_async_iterable_with_delays(length)
227 dl = parallel_async(
228 ait,
229 self.CONCURRENT_ITEMS,
230 self.callable_wrapped,
231 results,
232 parallel_count,
233 max_parallel_count,
234 )
235 yield dl
236 assert list(range(length)) == sorted(results)
237 assert parallel_count[0] == 0
238 assert max_parallel_count[0] <= self.CONCURRENT_ITEMS, max_parallel_count[0]
239
240
241class TestDeferredFromCoro:

Callers

nothing calls this directly

Calls 2

parallel_asyncFunction · 0.90

Tested by

no test coverage detected