(self)
| 428 | |
| 429 | @coroutine_test |
| 430 | async def test_store_with_acl(self): |
| 431 | storage = S3FeedStorage( |
| 432 | "s3://mybucket/export.csv", "access_key", "secret_key", "custom-acl" |
| 433 | ) |
| 434 | assert storage.access_key == "access_key" |
| 435 | assert storage.secret_key == "secret_key" |
| 436 | assert storage.acl == "custom-acl" |
| 437 | |
| 438 | storage.s3_client = mock.MagicMock() |
| 439 | await maybe_deferred_to_future(storage.store(BytesIO(b"test file"))) |
| 440 | acl = storage.s3_client.upload_fileobj.call_args[1]["ExtraArgs"]["ACL"] |
| 441 | assert acl == "custom-acl" |
| 442 | |
| 443 | def test_overwrite_default(self): |
| 444 | with LogCapture() as log: |
nothing calls this directly
no test coverage detected