Prepare the connection for action. Arguments: write (bool): are we a writer?
(self, write=False)
| 140 | self._lock = threading.RLock() |
| 141 | |
| 142 | def _get_connection(self, write=False): |
| 143 | """Prepare the connection for action. |
| 144 | |
| 145 | Arguments: |
| 146 | write (bool): are we a writer? |
| 147 | """ |
| 148 | if self._session is not None: |
| 149 | return |
| 150 | self._lock.acquire() |
| 151 | try: |
| 152 | if self._session is not None: |
| 153 | return |
| 154 | # using either 'servers' or 'bundle_path' here: |
| 155 | if self.servers: |
| 156 | self._cluster = cassandra.cluster.Cluster( |
| 157 | self.servers, port=self.port, |
| 158 | auth_provider=self.auth_provider, |
| 159 | **self.cassandra_options) |
| 160 | else: |
| 161 | # 'bundle_path' is guaranteed to be set |
| 162 | self._cluster = cassandra.cluster.Cluster( |
| 163 | cloud={ |
| 164 | 'secure_connect_bundle': self.bundle_path, |
| 165 | }, |
| 166 | auth_provider=self.auth_provider, |
| 167 | **self.cassandra_options) |
| 168 | self._session = self._cluster.connect(self.keyspace) |
| 169 | |
| 170 | # We're forced to do concatenation below, as formatting would |
| 171 | # blow up on superficial %s that'll be processed by Cassandra |
| 172 | self._write_stmt = cassandra.query.SimpleStatement( |
| 173 | Q_INSERT_RESULT.format( |
| 174 | table=self.table, expires=self.cqlexpires), |
| 175 | ) |
| 176 | self._write_stmt.consistency_level = self.write_consistency |
| 177 | |
| 178 | self._read_stmt = cassandra.query.SimpleStatement( |
| 179 | Q_SELECT_RESULT.format(table=self.table), |
| 180 | ) |
| 181 | self._read_stmt.consistency_level = self.read_consistency |
| 182 | |
| 183 | if write: |
| 184 | # Only possible writers "workers" are allowed to issue |
| 185 | # CREATE TABLE. This is to prevent conflicting situations |
| 186 | # where both task-creator and task-executor would issue it |
| 187 | # at the same time. |
| 188 | |
| 189 | # Anyway; if you're doing anything critical, you should |
| 190 | # have created this table in advance, in which case |
| 191 | # this query will be a no-op (AlreadyExists) |
| 192 | make_stmt = cassandra.query.SimpleStatement( |
| 193 | Q_CREATE_RESULT_TABLE.format(table=self.table), |
| 194 | ) |
| 195 | make_stmt.consistency_level = self.write_consistency |
| 196 | |
| 197 | try: |
| 198 | self._session.execute(make_stmt) |
| 199 | except cassandra.AlreadyExists: |