(self, report_internal_error, build_tracer, send)
| 442 | @patch('celery.app.trace.build_tracer') |
| 443 | @patch('celery.app.trace.report_internal_error') |
| 444 | def test_outside_body_error(self, report_internal_error, build_tracer, send): |
| 445 | tracer = Mock() |
| 446 | tracer.side_effect = KeyError('foo') |
| 447 | build_tracer.return_value = tracer |
| 448 | |
| 449 | @self.app.task(shared=False) |
| 450 | def xtask(): |
| 451 | pass |
| 452 | |
| 453 | trace_task(xtask, 'uuid', (), {}) |
| 454 | assert report_internal_error.call_count |
| 455 | assert send.call_count |
| 456 | assert xtask.__trace__ is tracer |
| 457 | |
| 458 | def test_backend_error_should_report_failure(self): |
| 459 | """check internal error is reported as failure. |
nothing calls this directly
no test coverage detected