Azure Storage Block Blob backend for Celery.
| 22 | |
| 23 | |
| 24 | class AzureBlockBlobBackend(KeyValueStoreBackend): |
| 25 | """Azure Storage Block Blob backend for Celery.""" |
| 26 | |
| 27 | def __init__(self, |
| 28 | url=None, |
| 29 | container_name=None, |
| 30 | *args, |
| 31 | **kwargs): |
| 32 | """ |
| 33 | Supported URL formats: |
| 34 | |
| 35 | azureblockblob://CONNECTION_STRING |
| 36 | azureblockblob://DefaultAzureCredential@STORAGE_ACCOUNT_URL |
| 37 | azureblockblob://ManagedIdentityCredential@STORAGE_ACCOUNT_URL |
| 38 | """ |
| 39 | super().__init__(*args, **kwargs) |
| 40 | |
| 41 | if azurestorage is None or azurestorage.__version__ < '12': |
| 42 | raise ImproperlyConfigured( |
| 43 | "You need to install the azure-storage-blob v12 library to" |
| 44 | "use the AzureBlockBlob backend") |
| 45 | |
| 46 | conf = self.app.conf |
| 47 | |
| 48 | self._connection_string = self._parse_url(url) |
| 49 | |
| 50 | self._container_name = ( |
| 51 | container_name or |
| 52 | conf["azureblockblob_container_name"]) |
| 53 | |
| 54 | self.base_path = conf.get('azureblockblob_base_path', '') |
| 55 | self._connection_timeout = conf.get( |
| 56 | 'azureblockblob_connection_timeout', 20 |
| 57 | ) |
| 58 | self._read_timeout = conf.get('azureblockblob_read_timeout', 120) |
| 59 | |
| 60 | @classmethod |
| 61 | def _parse_url(cls, url, prefix=AZURE_BLOCK_BLOB_CONNECTION_PREFIX): |
| 62 | connection_string = url[len(prefix):] |
| 63 | if not connection_string: |
| 64 | raise ImproperlyConfigured("Invalid URL") |
| 65 | |
| 66 | return connection_string |
| 67 | |
| 68 | @cached_property |
| 69 | def _blob_service_client(self): |
| 70 | """Return the Azure Storage Blob service client. |
| 71 | |
| 72 | If this is the first call to the property, the client is created and |
| 73 | the container is created if it doesn't yet exist. |
| 74 | |
| 75 | """ |
| 76 | if ( |
| 77 | "DefaultAzureCredential" in self._connection_string or |
| 78 | "ManagedIdentityCredential" in self._connection_string |
| 79 | ): |
| 80 | # Leveraging the work that Kombu already did for us |
| 81 | credential_, url = AzureStorageQueuesTransport.parse_uri( |
no outgoing calls