(self, ResultCls, setup=None, **kwargs)
| 132 | |
| 133 | @contextmanager |
| 134 | def _chord_context(self, ResultCls, setup=None, **kwargs): |
| 135 | @self.app.task(shared=False) |
| 136 | def callback(*args, **kwargs): |
| 137 | pass |
| 138 | self.app.finalize() |
| 139 | |
| 140 | pts, result.GroupResult = result.GroupResult, ResultCls |
| 141 | callback.apply_async = Mock() |
| 142 | callback_s = callback.s() |
| 143 | callback_s.id = 'callback_id' |
| 144 | fail_current = self.app.backend.fail_from_current_stack = Mock() |
| 145 | try: |
| 146 | with patch_unlock_retry(self.app) as (unlock, retry): |
| 147 | signature, canvas.maybe_signature = ( |
| 148 | canvas.maybe_signature, passthru, |
| 149 | ) |
| 150 | if setup: |
| 151 | setup(callback) |
| 152 | try: |
| 153 | assert self.app.tasks['celery.chord_unlock'] is unlock |
| 154 | try: |
| 155 | unlock( |
| 156 | 'group_id', callback_s, |
| 157 | result=[ |
| 158 | self.app.AsyncResult(r) for r in ['1', 2, 3] |
| 159 | ], |
| 160 | GroupResult=ResultCls, **kwargs |
| 161 | ) |
| 162 | except Retry: |
| 163 | pass |
| 164 | finally: |
| 165 | canvas.maybe_signature = signature |
| 166 | yield callback_s, retry, fail_current |
| 167 | finally: |
| 168 | result.GroupResult = pts |
| 169 | |
| 170 | def test_when_not_ready(self): |
| 171 | class NeverReady(TSR): |
no test coverage detected