MCPcopy
hub / github.com/celery/celery / _chord_context

Method _chord_context

t/unit/tasks/test_chord.py:134–168  ·  view source on GitHub ↗
(self, ResultCls, setup=None, **kwargs)

Source from the content-addressed store, hash-verified

132
133 @contextmanager
134 def _chord_context(self, ResultCls, setup=None, **kwargs):
135 @self.app.task(shared=False)
136 def callback(*args, **kwargs):
137 pass
138 self.app.finalize()
139
140 pts, result.GroupResult = result.GroupResult, ResultCls
141 callback.apply_async = Mock()
142 callback_s = callback.s()
143 callback_s.id = 'callback_id'
144 fail_current = self.app.backend.fail_from_current_stack = Mock()
145 try:
146 with patch_unlock_retry(self.app) as (unlock, retry):
147 signature, canvas.maybe_signature = (
148 canvas.maybe_signature, passthru,
149 )
150 if setup:
151 setup(callback)
152 try:
153 assert self.app.tasks['celery.chord_unlock'] is unlock
154 try:
155 unlock(
156 'group_id', callback_s,
157 result=[
158 self.app.AsyncResult(r) for r in ['1', 2, 3]
159 ],
160 GroupResult=ResultCls, **kwargs
161 )
162 except Retry:
163 pass
164 finally:
165 canvas.maybe_signature = signature
166 yield callback_s, retry, fail_current
167 finally:
168 result.GroupResult = pts
169
170 def test_when_not_ready(self):
171 class NeverReady(TSR):

Calls 5

patch_unlock_retryFunction · 0.85
setupFunction · 0.50
finalizeMethod · 0.45
sMethod · 0.45
AsyncResultMethod · 0.45

Tested by

no test coverage detected