MCPcopy
hub / github.com/celery/celery / test_apply_chord

Method test_apply_chord

t/unit/backends/test_base.py:227–234  ·  view source on GitHub ↗
(self, unlock='celery.chord_unlock')

Source from the content-addressed store, hash-verified

225 self.b.on_chord_part_return(None, None, None)
226
227 def test_apply_chord(self, unlock='celery.chord_unlock'):
228 self.app.tasks[unlock] = Mock()
229 header_result_args = (
230 uuid(),
231 [self.app.AsyncResult(x) for x in range(3)],
232 )
233 self.b.apply_chord(header_result_args, self.callback.s())
234 assert self.app.tasks[unlock].apply_async.call_count
235
236 def test_chord_unlock_queue(self, unlock='celery.chord_unlock'):
237 self.app.tasks[unlock] = Mock()

Callers

nothing calls this directly

Calls 3

AsyncResultMethod · 0.45
apply_chordMethod · 0.45
sMethod · 0.45

Tested by

no test coverage detected