MCPcopy
hub / github.com/celery/celery / test_iterdeps

Method test_iterdeps

t/unit/tasks/test_result.py:196–215  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

194 ]
195
196 def test_iterdeps(self):
197 x = self.app.AsyncResult('1')
198 c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
199 x._cache = {'status': states.SUCCESS, 'result': None, 'children': c}
200 for child in c:
201 child.backend = Mock()
202 child.backend.get_children.return_value = []
203 it = x.iterdeps()
204 assert list(it) == [
205 (None, x),
206 (x, c[0]),
207 (x, c[1]),
208 (x, c[2]),
209 ]
210 x._cache = None
211 x.ready = Mock()
212 x.ready.return_value = False
213 with pytest.raises(IncompleteStream):
214 list(x.iterdeps())
215 list(x.iterdeps(intermediate=True))
216
217 def test_eq_not_implemented(self):
218 assert self.app.AsyncResult('1') != object()

Callers

nothing calls this directly

Calls 4

iterdepsMethod · 0.95
EagerResultClass · 0.90
AsyncResultMethod · 0.45
raisesMethod · 0.45

Tested by

no test coverage detected