(self, r, propagate=False, max_retries=10, **kwargs)
| 107 | return retry_over_time(*args, **kwargs) |
| 108 | |
| 109 | def join(self, r, propagate=False, max_retries=10, **kwargs): |
| 110 | if self.no_join: |
| 111 | return |
| 112 | if not isinstance(r, ResultSet): |
| 113 | r = self.app.ResultSet([r]) |
| 114 | received = [] |
| 115 | |
| 116 | def on_result(task_id, value): |
| 117 | received.append(task_id) |
| 118 | |
| 119 | for i in range(max_retries) if max_retries else count(0): |
| 120 | received[:] = [] |
| 121 | try: |
| 122 | return r.get(callback=on_result, propagate=propagate, **kwargs) |
| 123 | except (socket.timeout, TimeoutError) as exc: |
| 124 | waiting_for = self.missing_results(r) |
| 125 | self.remark( |
| 126 | 'Still waiting for {}/{}: [{}]: {!r}'.format( |
| 127 | len(r) - len(received), len(r), |
| 128 | truncate(', '.join(waiting_for)), exc), '!', |
| 129 | ) |
| 130 | except self.connerrors as exc: |
| 131 | self.remark(f'join: connection lost: {exc!r}', '!') |
| 132 | raise AssertionError('Test failed: Missing task results') |
| 133 | |
| 134 | def inspect(self, timeout=3.0): |
| 135 | return self.app.control.inspect(timeout=timeout) |
no test coverage detected