MCPcopy
hub / github.com/celery/celery / test_join_timeout

Method test_join_timeout

t/unit/tasks/test_result.py:845–859  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

843 assert len(list(ts.iter_native())) == 10
844
845 def test_join_timeout(self):
846 ar = MockAsyncResultSuccess(uuid(), app=self.app)
847 ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
848 ar3 = self.app.AsyncResult(uuid())
849 ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
850 with pytest.raises(TimeoutError):
851 ts.join(timeout=0.0000001)
852
853 ar4 = self.app.AsyncResult(uuid())
854 ar4.get = Mock()
855 ts2 = self.app.GroupResult(uuid(), [ar4])
856 assert ts2.join(timeout=0.1)
857 callback = Mock(name='callback')
858 assert not ts2.join(timeout=0.1, callback=callback)
859 callback.assert_called_with(ar4.id, ar4.get())
860
861 def test_iter_native_when_empty_group(self):
862 ts = self.app.GroupResult(uuid(), [])

Callers

nothing calls this directly

Calls 6

getMethod · 0.95
GroupResultMethod · 0.80
AsyncResultMethod · 0.45
raisesMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected