| 97 | |
| 98 | |
| 99 | class Downloader: |
| 100 | DOWNLOAD_SLOT = "download_slot" |
| 101 | _SLOT_GC_INTERVAL: float = 60.0 # seconds |
| 102 | |
| 103 | def __init__(self, crawler: Crawler): |
| 104 | self.crawler: Crawler = crawler |
| 105 | self.settings: BaseSettings = crawler.settings |
| 106 | self.signals: SignalManager = crawler.signals |
| 107 | self.slots: dict[str, Slot] = {} |
| 108 | self.active: set[Request] = set() |
| 109 | self.handlers: DownloadHandlers = DownloadHandlers(crawler) |
| 110 | self.total_concurrency: int = self.settings.getint("CONCURRENT_REQUESTS") |
| 111 | self.domain_concurrency: int = self.settings.getint( |
| 112 | "CONCURRENT_REQUESTS_PER_DOMAIN" |
| 113 | ) |
| 114 | self.ip_concurrency: int = self.settings.getint("CONCURRENT_REQUESTS_PER_IP") |
| 115 | self.randomize_delay: bool = self.settings.getbool("RANDOMIZE_DOWNLOAD_DELAY") |
| 116 | self.middleware: DownloaderMiddlewareManager = ( |
| 117 | DownloaderMiddlewareManager.from_crawler(crawler) |
| 118 | ) |
| 119 | self._slot_gc_loop: AsyncioLoopingCall | LoopingCall | None = None |
| 120 | self.per_slot_settings: dict[str, dict[str, Any]] = self.settings.getdict( |
| 121 | "DOWNLOAD_SLOTS" |
| 122 | ) |
| 123 | |
| 124 | @inlineCallbacks |
| 125 | @_warn_spider_arg |
| 126 | def fetch( |
| 127 | self, request: Request, spider: Spider | None = None |
| 128 | ) -> Generator[Deferred[Any], Any, Response | Request]: |
| 129 | self.active.add(request) |
| 130 | try: |
| 131 | result: Response | Request = yield ( |
| 132 | deferred_from_coro( |
| 133 | self.middleware.download_async(self._enqueue_request, request) |
| 134 | ) |
| 135 | ) |
| 136 | return result |
| 137 | finally: |
| 138 | self.active.remove(request) |
| 139 | |
| 140 | def needs_backout(self) -> bool: |
| 141 | return len(self.active) >= self.total_concurrency |
| 142 | |
| 143 | @_warn_spider_arg |
| 144 | def _get_slot( |
| 145 | self, request: Request, spider: Spider | None = None |
| 146 | ) -> tuple[str, Slot]: |
| 147 | key = self.get_slot_key(request) |
| 148 | if key not in self.slots: |
| 149 | assert self.crawler.spider |
| 150 | slot_settings = self.per_slot_settings.get(key, {}) |
| 151 | conc = self.ip_concurrency or self.domain_concurrency |
| 152 | conc, delay = _get_concurrency_delay( |
| 153 | conc, self.crawler.spider, self.settings |
| 154 | ) |
| 155 | conc, delay = ( |
| 156 | slot_settings.get("concurrency", conc), |
no outgoing calls