MCPcopy
hub / github.com/celery/celery / test_init_session

Method test_init_session

t/unit/backends/test_cassandra.py:209–232  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

207 assert x._session is not None
208
209 def test_init_session(self):
210 # Tests behavior when Cluster.connect works properly
211 from celery.backends import cassandra as mod
212
213 class DummyCluster:
214
215 def __init__(self, *args, **kwargs):
216 pass
217
218 def connect(self, *args, **kwargs):
219 return Mock()
220
221 mod.cassandra = Mock()
222 mod.cassandra.cluster = Mock()
223 mod.cassandra.cluster.Cluster = DummyCluster
224
225 x = mod.CassandraBackend(app=self.app)
226 assert x._session is None
227 x._get_connection(write=True)
228 assert x._session is not None
229
230 s = x._session
231 x._get_connection()
232 assert s is x._session
233
234 def test_auth_provider(self):
235 # Ensure valid auth_provider works properly, and invalid one raises

Callers

nothing calls this directly

Calls 1

_get_connectionMethod · 0.95

Tested by

no test coverage detected