Google Cloud Storage task result backend. Uses Firestore for chord ref count.
| 159 | |
| 160 | |
| 161 | class GCSBackend(GCSBackendBase): |
| 162 | """Google Cloud Storage task result backend. |
| 163 | |
| 164 | Uses Firestore for chord ref count. |
| 165 | """ |
| 166 | |
| 167 | implements_incr = True |
| 168 | supports_native_join = True |
| 169 | |
| 170 | # Firestore parameters |
| 171 | _collection_name = 'celery' |
| 172 | _field_count = 'chord_count' |
| 173 | _field_expires = 'expires_at' |
| 174 | |
| 175 | def __init__(self, **kwargs): |
| 176 | if not (firestore and firestore_admin_v1): |
| 177 | raise ImproperlyConfigured( |
| 178 | 'You must install google-cloud-firestore to use gcs backend' |
| 179 | ) |
| 180 | super().__init__(**kwargs) |
| 181 | |
| 182 | self._firestore_lock = RLock() |
| 183 | self._firestore_client = None |
| 184 | |
| 185 | self.firestore_project = self.app.conf.get( |
| 186 | 'firestore_project', self.project |
| 187 | ) |
| 188 | if not self._is_firestore_ttl_policy_enabled(): |
| 189 | raise ImproperlyConfigured( |
| 190 | f'Missing TTL policy to use gcs backend with ttl on ' |
| 191 | f'Firestore collection: {self._collection_name} ' |
| 192 | f'project: {self.firestore_project}' |
| 193 | ) |
| 194 | |
| 195 | @property |
| 196 | def firestore_client(self): |
| 197 | """Returns a firestore client.""" |
| 198 | |
| 199 | # make sure it's thread-safe, as creating a new client is expensive |
| 200 | with self._firestore_lock: |
| 201 | if self._firestore_client and self._pid == getpid(): |
| 202 | return self._firestore_client |
| 203 | # make sure each process gets its own connection after a fork |
| 204 | self._firestore_client = firestore.Client( |
| 205 | project=self.firestore_project |
| 206 | ) |
| 207 | self._pid = getpid() |
| 208 | return self._firestore_client |
| 209 | |
| 210 | def _is_firestore_ttl_policy_enabled(self): |
| 211 | client = firestore_admin_v1.FirestoreAdminClient() |
| 212 | |
| 213 | name = ( |
| 214 | f"projects/{self.firestore_project}" |
| 215 | f"/databases/(default)/collectionGroups/{self._collection_name}" |
| 216 | f"/fields/{self._field_expires}" |
| 217 | ) |
| 218 | request = firestore_admin_v1.GetFieldRequest(name=name) |
no outgoing calls