MCPcopy
hub / github.com/celery/celery / test_process_cleanup_fails

Method test_process_cleanup_fails

t/unit/worker/test_request.py:114–122  ·  view source on GitHub ↗
(self, patching)

Source from the content-addressed store, hash-verified

112class test_trace_task(RequestCase):
113
114 def test_process_cleanup_fails(self, patching):
115 _logger = patching('celery.app.trace.logger')
116 self.mytask.backend = Mock()
117 self.mytask.backend.process_cleanup = Mock(side_effect=KeyError())
118 tid = uuid()
119 ret = jail(self.app, tid, self.mytask.name, {}, [2], {})
120 assert ret == 4
121 self.mytask.backend.mark_as_done.assert_called()
122 assert 'Process cleanup failed' in _logger.error.call_args[0][0]
123
124 def test_process_cleanup_BaseException(self):
125 self.mytask.backend = Mock()

Callers

nothing calls this directly

Calls 2

patchingFunction · 0.85
jailFunction · 0.85

Tested by

no test coverage detected