(self)
| 1053 | worker.signal_consumer_close() |
| 1054 | |
| 1055 | def test_rusage__no_resource(self): |
| 1056 | from celery.worker import worker |
| 1057 | prev, worker.resource = worker.resource, None |
| 1058 | try: |
| 1059 | self.worker.pool = Mock(name='pool') |
| 1060 | with pytest.raises(NotImplementedError): |
| 1061 | self.worker.rusage() |
| 1062 | self.worker.stats() |
| 1063 | finally: |
| 1064 | worker.resource = prev |
| 1065 | |
| 1066 | def test_repr(self): |
| 1067 | assert repr(self.worker) |