(self, unlock='celery.chord_unlock')
| 225 | self.b.on_chord_part_return(None, None, None) |
| 226 | |
| 227 | def test_apply_chord(self, unlock='celery.chord_unlock'): |
| 228 | self.app.tasks[unlock] = Mock() |
| 229 | header_result_args = ( |
| 230 | uuid(), |
| 231 | [self.app.AsyncResult(x) for x in range(3)], |
| 232 | ) |
| 233 | self.b.apply_chord(header_result_args, self.callback.s()) |
| 234 | assert self.app.tasks[unlock].apply_async.call_count |
| 235 | |
| 236 | def test_chord_unlock_queue(self, unlock='celery.chord_unlock'): |
| 237 | self.app.tasks[unlock] = Mock() |
nothing calls this directly
no test coverage detected