MCPcopy
hub / github.com/celery/celery / join

Method join

celery/contrib/testing/manager.py:109–132  ·  view source on GitHub ↗
(self, r, propagate=False, max_retries=10, **kwargs)

Source from the content-addressed store, hash-verified

107 return retry_over_time(*args, **kwargs)
108
109 def join(self, r, propagate=False, max_retries=10, **kwargs):
110 if self.no_join:
111 return
112 if not isinstance(r, ResultSet):
113 r = self.app.ResultSet([r])
114 received = []
115
116 def on_result(task_id, value):
117 received.append(task_id)
118
119 for i in range(max_retries) if max_retries else count(0):
120 received[:] = []
121 try:
122 return r.get(callback=on_result, propagate=propagate, **kwargs)
123 except (socket.timeout, TimeoutError) as exc:
124 waiting_for = self.missing_results(r)
125 self.remark(
126 'Still waiting for {}/{}: [{}]: {!r}'.format(
127 len(r) - len(received), len(r),
128 truncate(', '.join(waiting_for)), exc), '!',
129 )
130 except self.connerrors as exc:
131 self.remark(f'join: connection lost: {exc!r}', '!')
132 raise AssertionError('Test failed: Missing task results')
133
134 def inspect(self, timeout=3.0):
135 return self.app.control.inspect(timeout=timeout)

Callers 15

__init__Method · 0.45
_start_worker_threadFunction · 0.45
get_issuerMethod · 0.45
__init__Method · 0.45
_packMethod · 0.45
dedentFunction · 0.45
fill_paragraphsFunction · 0.45
joinFunction · 0.45
indentFunction · 0.45
remove_repeatingFunction · 0.45
safereprFunction · 0.45
_reprFunction · 0.45

Calls 6

getMethod · 0.95
missing_resultsMethod · 0.95
remarkMethod · 0.95
truncateFunction · 0.90
ResultSetMethod · 0.80
formatMethod · 0.45

Tested by

no test coverage detected