(self)
| 267 | |
| 268 | @coroutine_test |
| 269 | async def test_store(self): |
| 270 | settings = { |
| 271 | "AWS_ACCESS_KEY_ID": "access_key", |
| 272 | "AWS_SECRET_ACCESS_KEY": "secret_key", |
| 273 | } |
| 274 | crawler = get_crawler(settings_dict=settings) |
| 275 | bucket = "mybucket" |
| 276 | key = "export.csv" |
| 277 | storage = S3FeedStorage.from_crawler(crawler, f"s3://{bucket}/{key}") |
| 278 | verifyObject(IFeedStorage, storage) |
| 279 | |
| 280 | file = mock.MagicMock() |
| 281 | |
| 282 | storage.s3_client = mock.MagicMock() |
| 283 | await maybe_deferred_to_future(storage.store(file)) |
| 284 | assert storage.s3_client.upload_fileobj.call_args == mock.call( |
| 285 | Bucket=bucket, Key=key, Fileobj=file |
| 286 | ) |
| 287 | |
| 288 | def test_init_without_acl(self): |
| 289 | storage = S3FeedStorage("s3://mybucket/export.csv", "access_key", "secret_key") |
nothing calls this directly
no test coverage detected