(self)
| 247 | assert ('worker-heartbeat',) in event_dispatcher.send.call_args |
| 248 | |
| 249 | def test_time_limit(self): |
| 250 | panel = self.create_panel(consumer=Mock()) |
| 251 | r = panel.handle('time_limit', arguments={ |
| 252 | 'task_name': self.mytask.name, 'hard': 30, 'soft': 10}) |
| 253 | assert self.mytask.time_limit == 30 |
| 254 | assert self.mytask.soft_time_limit == 10 |
| 255 | assert 'ok' in r |
| 256 | r = panel.handle('time_limit', arguments={ |
| 257 | 'task_name': self.mytask.name, 'hard': None, 'soft': None}) |
| 258 | assert self.mytask.time_limit is None |
| 259 | assert self.mytask.soft_time_limit is None |
| 260 | assert 'ok' in r |
| 261 | |
| 262 | r = panel.handle('time_limit', arguments={ |
| 263 | 'task_name': '248e8afya9s8dh921eh928', 'hard': 30}) |
| 264 | assert 'error' in r |
| 265 | |
| 266 | def test_active_queues(self): |
| 267 | import kombu |
nothing calls this directly
no test coverage detected