MCPcopy
hub / github.com/scrapy/scrapy / Slot

Class Slot

scrapy/core/downloader/__init__.py:45–80  ·  view source on GitHub ↗

Downloader slot

Source from the content-addressed store, hash-verified

43
44@dataclass(slots=True, eq=False)
45class Slot:
46 """Downloader slot"""
47
48 concurrency: int
49 delay: float
50 randomize_delay: bool
51
52 active: set[Request] = field(default_factory=set, init=False, repr=False)
53 queue: deque[tuple[Request, Deferred[Response]]] = field(
54 default_factory=deque, init=False, repr=False
55 )
56 transferring: set[Request] = field(default_factory=set, init=False, repr=False)
57 lastseen: float = field(default=0, init=False, repr=False)
58 latercall: CallLaterResult | None = field(default=None, init=False, repr=False)
59
60 def free_transfer_slots(self) -> int:
61 return self.concurrency - len(self.transferring)
62
63 def download_delay(self) -> float:
64 if self.randomize_delay:
65 return random.uniform(0.5 * self.delay, 1.5 * self.delay) # noqa: S311
66 return self.delay
67
68 def close(self) -> None:
69 if self.latercall:
70 self.latercall.cancel()
71 self.latercall = None
72
73 def __str__(self) -> str:
74 return (
75 f"<downloader.Slot concurrency={self.concurrency!r} "
76 f"delay={self.delay:.2f} randomize_delay={self.randomize_delay!r} "
77 f"len(active)={len(self.active)} len(queue)={len(self.queue)} "
78 f"len(transferring)={len(self.transferring)} "
79 f"lastseen={datetime.fromtimestamp(self.lastseen).isoformat()}>"
80 )
81
82
83def _get_concurrency_delay(

Callers 3

test_reprMethod · 0.90
test_paramsFunction · 0.90
_get_slotMethod · 0.70

Calls

no outgoing calls

Tested by 2

test_reprMethod · 0.72
test_paramsFunction · 0.72