MCPcopy
hub / github.com/celery/celery / test_memory_client_is_shared

Method test_memory_client_is_shared

t/unit/backends/test_cache.py:38–46  ·  view source on GitHub ↗

This test verifies that memory:// backend state is shared over multiple threads

(self)

Source from the content-addressed store, hash-verified

36 CacheBackend(backend=None, app=self.app)
37
38 def test_memory_client_is_shared(self):
39 """This test verifies that memory:// backend state is shared over multiple threads"""
40 from threading import Thread
41 t = Thread(
42 target=lambda: CacheBackend(backend='memory://', app=self.app).set('test', 12345)
43 )
44 t.start()
45 t.join()
46 assert self.tb.client.get('test') == 12345
47
48 def test_mark_as_done(self):
49 assert self.tb.get_state(self.tid) == states.PENDING

Callers

nothing calls this directly

Calls 6

CacheBackendClass · 0.90
ThreadClass · 0.85
setMethod · 0.45
startMethod · 0.45
joinMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected