MCPcopy
hub / github.com/celery/celery / test_time_limit

Method test_time_limit

t/unit/worker/test_control.py:249–264  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

247 assert ('worker-heartbeat',) in event_dispatcher.send.call_args
248
249 def test_time_limit(self):
250 panel = self.create_panel(consumer=Mock())
251 r = panel.handle('time_limit', arguments={
252 'task_name': self.mytask.name, 'hard': 30, 'soft': 10})
253 assert self.mytask.time_limit == 30
254 assert self.mytask.soft_time_limit == 10
255 assert 'ok' in r
256 r = panel.handle('time_limit', arguments={
257 'task_name': self.mytask.name, 'hard': None, 'soft': None})
258 assert self.mytask.time_limit is None
259 assert self.mytask.soft_time_limit is None
260 assert 'ok' in r
261
262 r = panel.handle('time_limit', arguments={
263 'task_name': '248e8afya9s8dh921eh928', 'hard': 30})
264 assert 'error' in r
265
266 def test_active_queues(self):
267 import kombu

Callers

nothing calls this directly

Calls 2

create_panelMethod · 0.95
handleMethod · 0.45

Tested by

no test coverage detected