MCPcopy
hub / github.com/celery/celery / __init__

Method __init__

celery/backends/gcs.py:46–84  ·  view source on GitHub ↗
(self, **kwargs)

Source from the content-addressed store, hash-verified

44 """Google Cloud Storage task result backend."""
45
46 def __init__(self, **kwargs):
47 if not storage:
48 raise ImproperlyConfigured(
49 'You must install google-cloud-storage to use gcs backend'
50 )
51 super().__init__(**kwargs)
52 self._client_lock = RLock()
53 self._pid = getpid()
54 self._retry_policy = DEFAULT_RETRY
55 self._client = None
56
57 conf = self.app.conf
58 if self.url:
59 url_params = self._params_from_url()
60 conf.update(**dictfilter(url_params))
61
62 self.bucket_name = conf.get('gcs_bucket')
63 if not self.bucket_name:
64 raise ImproperlyConfigured(
65 'Missing bucket name: specify gcs_bucket to use gcs backend'
66 )
67 self.project = conf.get('gcs_project')
68 if not self.project:
69 raise ImproperlyConfigured(
70 'Missing project:specify gcs_project to use gcs backend'
71 )
72 self.base_path = conf.get('gcs_base_path', '').strip('/')
73 self._threadpool_maxsize = int(conf.get('gcs_threadpool_maxsize', 10))
74 self.ttl = float(conf.get('gcs_ttl') or 0)
75 if self.ttl < 0:
76 raise ImproperlyConfigured(
77 f'Invalid ttl: {self.ttl} must be greater than or equal to 0'
78 )
79 elif self.ttl:
80 if not self._is_bucket_lifecycle_rule_exists():
81 raise ImproperlyConfigured(
82 f'Missing lifecycle rule to use gcs backend with ttl on '
83 f'bucket: {self.bucket_name}'
84 )
85
86 def get(self, key):
87 key = bytes_to_str(key)

Callers

nothing calls this directly

Calls 6

_params_from_urlMethod · 0.95
__init__Method · 0.45
updateMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected