MCPcopy
hub / github.com/scrapy/scrapy / test_simple

Method test_simple

tests/test_utils_defer.py:201–218  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

199
200 @inline_callbacks_test
201 def test_simple(self):
202 for length in [20, 50, 100]:
203 parallel_count = [0]
204 max_parallel_count = [0]
205 results = []
206 ait = self.get_async_iterable(length)
207 dl = parallel_async(
208 ait,
209 self.CONCURRENT_ITEMS,
210 self.callable_wrapped,
211 results,
212 parallel_count,
213 max_parallel_count,
214 )
215 yield dl
216 assert list(range(length)) == sorted(results)
217 assert parallel_count[0] == 0
218 assert max_parallel_count[0] <= self.CONCURRENT_ITEMS, max_parallel_count[0]
219
220 @inline_callbacks_test
221 def test_delays(self):

Callers

nothing calls this directly

Calls 2

get_async_iterableMethod · 0.95
parallel_asyncFunction · 0.90

Tested by

no test coverage detected