Tests listing active tasks
(self, inspect)
| 98 | |
| 99 | @flaky |
| 100 | def test_active(self, inspect): |
| 101 | """Tests listing active tasks""" |
| 102 | res = sleeping.delay(5) |
| 103 | sleep(1) |
| 104 | ret = inspect.active() |
| 105 | assert len(ret) == 1 |
| 106 | assert ret[NODENAME] == [ |
| 107 | { |
| 108 | 'id': res.task_id, |
| 109 | 'name': 't.integration.tasks.sleeping', |
| 110 | 'args': [5], |
| 111 | 'kwargs': {}, |
| 112 | 'type': 't.integration.tasks.sleeping', |
| 113 | 'hostname': ANY, |
| 114 | 'time_start': ANY, |
| 115 | 'acknowledged': True, |
| 116 | 'delivery_info': { |
| 117 | 'exchange': '', |
| 118 | 'routing_key': 'celery', |
| 119 | 'priority': 0, |
| 120 | 'redelivered': False |
| 121 | }, |
| 122 | 'worker_pid': ANY |
| 123 | } |
| 124 | ] |
| 125 | |
| 126 | @flaky |
| 127 | def test_scheduled(self, inspect): |