MCPcopy
hub / github.com/celery/celery / test_get_blob

Method test_get_blob

t/unit/backends/test_gcs.py:224–237  ·  view source on GitHub ↗
(self, mock_firestore_ttl, mock_bucket)

Source from the content-addressed store, hash-verified

222 @patch.object(GCSBackend, 'bucket')
223 @patch.object(GCSBackend, '_is_firestore_ttl_policy_enabled')
224 def test_get_blob(self, mock_firestore_ttl, mock_bucket):
225 key = 'test_key'
226 mock_blob = MagicMock()
227 mock_bucket.blob.return_value = mock_blob
228 mock_firestore_ttl.return_value = True
229
230 backend = GCSBackend(app=self.app)
231 result = backend._get_blob(key)
232
233 key_bucket_path = (
234 f'{backend.base_path}/{key}' if backend.base_path else key
235 )
236 mock_bucket.blob.assert_called_once_with(key_bucket_path)
237 assert result == mock_blob
238
239 @patch('celery.backends.gcs.Client')
240 @patch('celery.backends.gcs.getpid')

Callers

nothing calls this directly

Calls 2

GCSBackendClass · 0.90
_get_blobMethod · 0.80

Tested by

no test coverage detected