(self, crawler: Crawler)
| 20 | lazy = True |
| 21 | |
| 22 | def __init__(self, crawler: Crawler): |
| 23 | if not is_botocore_available(): |
| 24 | raise NotConfigured("missing botocore library") |
| 25 | |
| 26 | super().__init__(crawler) |
| 27 | aws_access_key_id = crawler.settings["AWS_ACCESS_KEY_ID"] |
| 28 | aws_secret_access_key = crawler.settings["AWS_SECRET_ACCESS_KEY"] |
| 29 | aws_session_token = crawler.settings["AWS_SESSION_TOKEN"] |
| 30 | self.anon = not aws_access_key_id and not aws_secret_access_key |
| 31 | self._signer = None |
| 32 | if not self.anon: |
| 33 | import botocore.auth # noqa: PLC0415 |
| 34 | import botocore.credentials # noqa: PLC0415 |
| 35 | |
| 36 | SignerCls = botocore.auth.AUTH_TYPE_MAPS["s3"] |
| 37 | # botocore.auth.BaseSigner doesn't have an __init__() with args, only subclasses do |
| 38 | self._signer = SignerCls( # type: ignore[call-arg] |
| 39 | botocore.credentials.Credentials( |
| 40 | aws_access_key_id, aws_secret_access_key, aws_session_token |
| 41 | ) |
| 42 | ) |
| 43 | |
| 44 | _http_handler: BaseDownloadHandler = build_from_crawler( |
| 45 | load_object(crawler.settings.getwithbase("DOWNLOAD_HANDLERS")["https"]), |
| 46 | crawler, |
| 47 | ) |
| 48 | self._download_http = _http_handler.download_request |
| 49 | |
| 50 | async def download_request(self, request: Request) -> Response: |
| 51 | p = urlparse_cached(request) |
nothing calls this directly
no test coverage detected