(self, task_join_will_block)
| 111 | |
| 112 | @patch('celery.result.task_join_will_block') |
| 113 | def test_get_sync_subtask_option(self, task_join_will_block): |
| 114 | task_join_will_block.return_value = True |
| 115 | tid = uuid() |
| 116 | backend = _MockBackend() |
| 117 | res_subtask_async = AsyncResult(tid, backend=backend) |
| 118 | with pytest.raises(RuntimeError): |
| 119 | res_subtask_async.get() |
| 120 | res_subtask_async.get(disable_sync_subtasks=False) |
| 121 | |
| 122 | def test_without_id(self): |
| 123 | with pytest.raises(ValueError): |
nothing calls this directly
no test coverage detected