MCPcopy
hub / github.com/celery/celery / test_scheduled

Method test_scheduled

t/integration/test_inspect.py:127–155  ·  view source on GitHub ↗

Tests listing scheduled tasks

(self, inspect)

Source from the content-addressed store, hash-verified

125
126 @flaky
127 def test_scheduled(self, inspect):
128 """Tests listing scheduled tasks"""
129 exec_time = datetime.now(timezone.utc) + timedelta(seconds=5)
130 res = add.apply_async([1, 2], {'z': 3}, eta=exec_time)
131 ret = inspect.scheduled()
132 assert len(ret) == 1
133 assert ret[NODENAME] == [
134 {
135 'eta': exec_time.strftime('%Y-%m-%dT%H:%M:%S.%f') + '+00:00',
136 'priority': 6,
137 'request': {
138 'id': res.task_id,
139 'name': 't.integration.tasks.add',
140 'args': [1, 2],
141 'kwargs': {'z': 3},
142 'type': 't.integration.tasks.add',
143 'hostname': ANY,
144 'time_start': None,
145 'acknowledged': False,
146 'delivery_info': {
147 'exchange': '',
148 'routing_key': 'celery',
149 'priority': 0,
150 'redelivered': False
151 },
152 'worker_pid': None
153 }
154 }
155 ]
156
157 @flaky
158 def test_query_task(self, inspect):

Callers

nothing calls this directly

Calls 3

scheduledMethod · 0.80
nowMethod · 0.45
apply_asyncMethod · 0.45

Tested by

no test coverage detected