(self)
| 70 | assert isinstance(self.tb.get_result(self.tid), KeyError) |
| 71 | |
| 72 | def test_apply_chord(self): |
| 73 | tb = CacheBackend(backend='memory://', app=self.app) |
| 74 | result_args = ( |
| 75 | uuid(), |
| 76 | [self.app.AsyncResult(uuid()) for _ in range(3)], |
| 77 | ) |
| 78 | tb.apply_chord(result_args, None) |
| 79 | assert self.app.GroupResult.restore(result_args[0], backend=tb) == self.app.GroupResult(*result_args) |
| 80 | |
| 81 | @patch('celery.result.GroupResult.restore') |
| 82 | def test_on_chord_part_return(self, restore): |
nothing calls this directly
no test coverage detected