(self)
| 194 | ] |
| 195 | |
| 196 | def test_iterdeps(self): |
| 197 | x = self.app.AsyncResult('1') |
| 198 | c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)] |
| 199 | x._cache = {'status': states.SUCCESS, 'result': None, 'children': c} |
| 200 | for child in c: |
| 201 | child.backend = Mock() |
| 202 | child.backend.get_children.return_value = [] |
| 203 | it = x.iterdeps() |
| 204 | assert list(it) == [ |
| 205 | (None, x), |
| 206 | (x, c[0]), |
| 207 | (x, c[1]), |
| 208 | (x, c[2]), |
| 209 | ] |
| 210 | x._cache = None |
| 211 | x.ready = Mock() |
| 212 | x.ready.return_value = False |
| 213 | with pytest.raises(IncompleteStream): |
| 214 | list(x.iterdeps()) |
| 215 | list(x.iterdeps(intermediate=True)) |
| 216 | |
| 217 | def test_eq_not_implemented(self): |
| 218 | assert self.app.AsyncResult('1') != object() |
nothing calls this directly
no test coverage detected