MCPcopy
hub / github.com/celery/celery / test_basic_task

Method test_basic_task

t/integration/test_tasks.py:76–98  ·  view source on GitHub ↗

Tests basic task call

(self, manager)

Source from the content-addressed store, hash-verified

74
75 @flaky
76 def test_basic_task(self, manager):
77 """Tests basic task call"""
78 results = []
79 # Tests calling task only with args
80 for i in range(10):
81 results.append([i + i, add.delay(i, i)])
82 for expected, result in results:
83 value = result.get(timeout=10)
84 assert value == expected
85 assert result.status == 'SUCCESS'
86 assert result.ready() is True
87 assert result.successful() is True
88
89 results = []
90 # Tests calling task with args and kwargs
91 for i in range(10):
92 results.append([3*i, add.delay(i, i, z=i)])
93 for expected, result in results:
94 value = result.get(timeout=10)
95 assert value == expected
96 assert result.status == 'SUCCESS'
97 assert result.ready() is True
98 assert result.successful() is True
99
100 @flaky
101 @pytest.mark.skip(reason="Broken test")

Callers

nothing calls this directly

Calls 4

delayMethod · 0.45
getMethod · 0.45
readyMethod · 0.45
successfulMethod · 0.45

Tested by

no test coverage detected