MCPcopy
hub / github.com/celery/celery / test_get_sync_subtask_option

Method test_get_sync_subtask_option

t/unit/tasks/test_result.py:985–991  ·  view source on GitHub ↗
(self, task_join_will_block)

Source from the content-addressed store, hash-verified

983
984 @patch('celery.result.task_join_will_block')
985 def test_get_sync_subtask_option(self, task_join_will_block):
986 task_join_will_block.return_value = True
987 tid = uuid()
988 res_subtask_async = EagerResult(tid, 'x', 'x', states.SUCCESS)
989 with pytest.raises(RuntimeError):
990 res_subtask_async.get()
991 res_subtask_async.get(disable_sync_subtasks=False)
992
993 def test_populate_name(self):
994 res = EagerResult('x', 'x', states.SUCCESS, None, 'test_task')

Callers

nothing calls this directly

Calls 3

getMethod · 0.95
EagerResultClass · 0.90
raisesMethod · 0.45

Tested by

no test coverage detected