MCPcopy
hub / github.com/celery/celery / test_apply_chord

Method test_apply_chord

t/unit/backends/test_cache.py:72–79  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

70 assert isinstance(self.tb.get_result(self.tid), KeyError)
71
72 def test_apply_chord(self):
73 tb = CacheBackend(backend='memory://', app=self.app)
74 result_args = (
75 uuid(),
76 [self.app.AsyncResult(uuid()) for _ in range(3)],
77 )
78 tb.apply_chord(result_args, None)
79 assert self.app.GroupResult.restore(result_args[0], backend=tb) == self.app.GroupResult(*result_args)
80
81 @patch('celery.result.GroupResult.restore')
82 def test_on_chord_part_return(self, restore):

Callers

nothing calls this directly

Calls 5

CacheBackendClass · 0.90
GroupResultMethod · 0.80
AsyncResultMethod · 0.45
apply_chordMethod · 0.45
restoreMethod · 0.45

Tested by

no test coverage detected