MCPcopy
hub / github.com/celery/celery / _get_connection

Method _get_connection

celery/backends/cassandra.py:142–212  ·  view source on GitHub ↗

Prepare the connection for action. Arguments: write (bool): are we a writer?

(self, write=False)

Source from the content-addressed store, hash-verified

140 self._lock = threading.RLock()
141
142 def _get_connection(self, write=False):
143 """Prepare the connection for action.
144
145 Arguments:
146 write (bool): are we a writer?
147 """
148 if self._session is not None:
149 return
150 self._lock.acquire()
151 try:
152 if self._session is not None:
153 return
154 # using either 'servers' or 'bundle_path' here:
155 if self.servers:
156 self._cluster = cassandra.cluster.Cluster(
157 self.servers, port=self.port,
158 auth_provider=self.auth_provider,
159 **self.cassandra_options)
160 else:
161 # 'bundle_path' is guaranteed to be set
162 self._cluster = cassandra.cluster.Cluster(
163 cloud={
164 'secure_connect_bundle': self.bundle_path,
165 },
166 auth_provider=self.auth_provider,
167 **self.cassandra_options)
168 self._session = self._cluster.connect(self.keyspace)
169
170 # We're forced to do concatenation below, as formatting would
171 # blow up on superficial %s that'll be processed by Cassandra
172 self._write_stmt = cassandra.query.SimpleStatement(
173 Q_INSERT_RESULT.format(
174 table=self.table, expires=self.cqlexpires),
175 )
176 self._write_stmt.consistency_level = self.write_consistency
177
178 self._read_stmt = cassandra.query.SimpleStatement(
179 Q_SELECT_RESULT.format(table=self.table),
180 )
181 self._read_stmt.consistency_level = self.read_consistency
182
183 if write:
184 # Only possible writers "workers" are allowed to issue
185 # CREATE TABLE. This is to prevent conflicting situations
186 # where both task-creator and task-executor would issue it
187 # at the same time.
188
189 # Anyway; if you're doing anything critical, you should
190 # have created this table in advance, in which case
191 # this query will be a no-op (AlreadyExists)
192 make_stmt = cassandra.query.SimpleStatement(
193 Q_CREATE_RESULT_TABLE.format(table=self.table),
194 )
195 make_stmt.consistency_level = self.write_consistency
196
197 try:
198 self._session.execute(make_stmt)
199 except cassandra.AlreadyExists:

Callers 5

test_init_with_cloudMethod · 0.95
test_init_sessionMethod · 0.95
_store_resultMethod · 0.95
_get_task_meta_forMethod · 0.95

Calls 7

acquireMethod · 0.80
ClusterMethod · 0.80
releaseMethod · 0.80
connectMethod · 0.45
formatMethod · 0.45
executeMethod · 0.45
shutdownMethod · 0.45

Tested by 3

test_init_with_cloudMethod · 0.76
test_init_sessionMethod · 0.76