MCPcopy
hub / github.com/scrapy/scrapy / test_dequeue_priorities

Method test_dequeue_priorities

tests/test_scheduler.py:177–188  ·  view source on GitHub ↗
(self, jobdir: Path | None)

Source from the content-addressed store, hash-verified

175
176 @coroutine_test
177 async def test_dequeue_priorities(self, jobdir: Path | None) -> None:
178 async with self.create_scheduler(jobdir) as scheduler:
179 for url, priority in _PRIORITIES:
180 scheduler.enqueue_request(Request(url, priority=priority))
181
182 priorities = []
183 while scheduler.has_pending_requests():
184 request = scheduler.next_request()
185 assert request is not None
186 priorities.append(request.priority)
187
188 assert priorities == sorted([x[1] for x in _PRIORITIES], key=lambda x: -x)
189
190
191class TestSchedulerOnDiskBase(TestSchedulerBase):

Callers

nothing calls this directly

Calls 5

RequestClass · 0.90
create_schedulerMethod · 0.80
enqueue_requestMethod · 0.45
has_pending_requestsMethod · 0.45
next_requestMethod · 0.45

Tested by

no test coverage detected