MCPcopy
hub / github.com/celery/celery / test_active

Method test_active

t/integration/test_inspect.py:100–124  ·  view source on GitHub ↗

Tests listing active tasks

(self, inspect)

Source from the content-addressed store, hash-verified

98
99 @flaky
100 def test_active(self, inspect):
101 """Tests listing active tasks"""
102 res = sleeping.delay(5)
103 sleep(1)
104 ret = inspect.active()
105 assert len(ret) == 1
106 assert ret[NODENAME] == [
107 {
108 'id': res.task_id,
109 'name': 't.integration.tasks.sleeping',
110 'args': [5],
111 'kwargs': {},
112 'type': 't.integration.tasks.sleeping',
113 'hostname': ANY,
114 'time_start': ANY,
115 'acknowledged': True,
116 'delivery_info': {
117 'exchange': '',
118 'routing_key': 'celery',
119 'priority': 0,
120 'redelivered': False
121 },
122 'worker_pid': ANY
123 }
124 ]
125
126 @flaky
127 def test_scheduled(self, inspect):

Callers

nothing calls this directly

Calls 2

delayMethod · 0.45
activeMethod · 0.45

Tested by

no test coverage detected