(self, size=1)
| 349 | |
| 350 | @contextmanager |
| 351 | def chord_context(self, size=1): |
| 352 | with patch('celery.backends.redis.maybe_signature') as ms: |
| 353 | request = Mock(name='request') |
| 354 | request.id = 'id1' |
| 355 | group_id = 'gid1' |
| 356 | request.group = group_id |
| 357 | request.group_index = None |
| 358 | tasks = [ |
| 359 | self.create_task(i, group_id=request.group) |
| 360 | for i in range(size) |
| 361 | ] |
| 362 | callback = ms.return_value = Signature('add') |
| 363 | callback.id = 'id1' |
| 364 | self.b.set_chord_size(group_id, size) |
| 365 | callback.delay = Mock(name='callback.delay') |
| 366 | yield tasks, request, callback |
| 367 | |
| 368 | def setup_method(self): |
| 369 | self.Backend = self.get_backend() |
no test coverage detected