(self)
| 207 | assert x._session is not None |
| 208 | |
| 209 | def test_init_session(self): |
| 210 | # Tests behavior when Cluster.connect works properly |
| 211 | from celery.backends import cassandra as mod |
| 212 | |
| 213 | class DummyCluster: |
| 214 | |
| 215 | def __init__(self, *args, **kwargs): |
| 216 | pass |
| 217 | |
| 218 | def connect(self, *args, **kwargs): |
| 219 | return Mock() |
| 220 | |
| 221 | mod.cassandra = Mock() |
| 222 | mod.cassandra.cluster = Mock() |
| 223 | mod.cassandra.cluster.Cluster = DummyCluster |
| 224 | |
| 225 | x = mod.CassandraBackend(app=self.app) |
| 226 | assert x._session is None |
| 227 | x._get_connection(write=True) |
| 228 | assert x._session is not None |
| 229 | |
| 230 | s = x._session |
| 231 | x._get_connection() |
| 232 | assert s is x._session |
| 233 | |
| 234 | def test_auth_provider(self): |
| 235 | # Ensure valid auth_provider works properly, and invalid one raises |
nothing calls this directly
no test coverage detected