(self)
| 466 | _state._tls.current_app = current |
| 467 | |
| 468 | def test_current_task(self): |
| 469 | @self.app.task |
| 470 | def foo(shared=False): |
| 471 | pass |
| 472 | |
| 473 | _state._task_stack.push(foo) |
| 474 | try: |
| 475 | assert self.app.current_task.name == foo.name |
| 476 | finally: |
| 477 | _state._task_stack.pop() |
| 478 | |
| 479 | def test_task_not_shared(self): |
| 480 | with patch('celery.app.base.connect_on_app_finalize') as sh: |