(self)
| 531 | assert tid + 'xxx' not in revoked |
| 532 | |
| 533 | def test_revoke_terminate(self): |
| 534 | request = Mock() |
| 535 | request.id = tid = uuid() |
| 536 | state = self.create_state() |
| 537 | state.consumer = Mock() |
| 538 | worker_state.task_reserved(request) |
| 539 | try: |
| 540 | r = control.revoke(state, tid, terminate=True) |
| 541 | assert tid in revoked |
| 542 | assert request.terminate.call_count |
| 543 | assert 'terminate:' in r['ok'] |
| 544 | # unknown task id only revokes |
| 545 | r = control.revoke(state, uuid(), terminate=True) |
| 546 | assert 'tasks unknown' in r['ok'] |
| 547 | finally: |
| 548 | worker_state.task_ready(request) |
| 549 | |
| 550 | @pytest.mark.parametrize( |
| 551 | "terminate", [True, False], |
nothing calls this directly
no test coverage detected