MCPcopy
hub / github.com/celery/celery / __init__

Method __init__

celery/backends/cassandra.py:88–140  ·  view source on GitHub ↗
(self, servers=None, keyspace=None, table=None, entry_ttl=None,
                 port=None, bundle_path=None, **kwargs)

Source from the content-addressed store, hash-verified

86 supports_autoexpire = True # autoexpire supported via entry_ttl
87
88 def __init__(self, servers=None, keyspace=None, table=None, entry_ttl=None,
89 port=None, bundle_path=None, **kwargs):
90 super().__init__(**kwargs)
91
92 if not cassandra:
93 raise ImproperlyConfigured(E_NO_CASSANDRA)
94
95 conf = self.app.conf
96 self.servers = servers or conf.get('cassandra_servers', None)
97 self.bundle_path = bundle_path or conf.get(
98 'cassandra_secure_bundle_path', None)
99 self.port = port or conf.get('cassandra_port', None) or 9042
100 self.keyspace = keyspace or conf.get('cassandra_keyspace', None)
101 self.table = table or conf.get('cassandra_table', None)
102 self.cassandra_options = conf.get('cassandra_options', {})
103
104 # either servers or bundle path must be provided...
105 db_directions = self.servers or self.bundle_path
106 if not db_directions or not self.keyspace or not self.table:
107 raise ImproperlyConfigured(E_CASSANDRA_NOT_CONFIGURED)
108 # ...but not both:
109 if self.servers and self.bundle_path:
110 raise ImproperlyConfigured(E_CASSANDRA_MISCONFIGURED)
111
112 expires = entry_ttl or conf.get('cassandra_entry_ttl', None)
113
114 self.cqlexpires = (
115 Q_EXPIRES.format(expires) if expires is not None else '')
116
117 read_cons = conf.get('cassandra_read_consistency') or 'LOCAL_QUORUM'
118 write_cons = conf.get('cassandra_write_consistency') or 'LOCAL_QUORUM'
119
120 self.read_consistency = getattr(
121 cassandra.ConsistencyLevel, read_cons,
122 cassandra.ConsistencyLevel.LOCAL_QUORUM)
123 self.write_consistency = getattr(
124 cassandra.ConsistencyLevel, write_cons,
125 cassandra.ConsistencyLevel.LOCAL_QUORUM)
126
127 self.auth_provider = None
128 auth_provider = conf.get('cassandra_auth_provider', None)
129 auth_kwargs = conf.get('cassandra_auth_kwargs', None)
130 if auth_provider and auth_kwargs:
131 auth_provider_class = getattr(cassandra.auth, auth_provider, None)
132 if not auth_provider_class:
133 raise ImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)
134 self.auth_provider = auth_provider_class(**auth_kwargs)
135
136 self._cluster = None
137 self._session = None
138 self._write_stmt = None
139 self._read_stmt = None
140 self._lock = threading.RLock()
141
142 def _get_connection(self, write=False):
143 """Prepare the connection for action.

Callers

nothing calls this directly

Calls 3

getMethod · 0.45
formatMethod · 0.45

Tested by

no test coverage detected