MCPcopy
hub / github.com/celery/celery / test_revoke_terminate

Method test_revoke_terminate

t/unit/worker/test_control.py:533–548  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

531 assert tid + 'xxx' not in revoked
532
533 def test_revoke_terminate(self):
534 request = Mock()
535 request.id = tid = uuid()
536 state = self.create_state()
537 state.consumer = Mock()
538 worker_state.task_reserved(request)
539 try:
540 r = control.revoke(state, tid, terminate=True)
541 assert tid in revoked
542 assert request.terminate.call_count
543 assert 'terminate:' in r['ok']
544 # unknown task id only revokes
545 r = control.revoke(state, uuid(), terminate=True)
546 assert 'tasks unknown' in r['ok']
547 finally:
548 worker_state.task_ready(request)
549
550 @pytest.mark.parametrize(
551 "terminate", [True, False],

Callers

nothing calls this directly

Calls 3

create_stateMethod · 0.95
task_readyMethod · 0.80
revokeMethod · 0.45

Tested by

no test coverage detected