(self)
| 882 | i.annotate() |
| 883 | |
| 884 | def test_apply_async_adds_children(self): |
| 885 | from celery._state import _task_stack |
| 886 | |
| 887 | @self.app.task(bind=True, shared=False) |
| 888 | def a3cX1(self): |
| 889 | pass |
| 890 | |
| 891 | @self.app.task(bind=True, shared=False) |
| 892 | def a3cX2(self): |
| 893 | pass |
| 894 | |
| 895 | _task_stack.push(a3cX1) |
| 896 | try: |
| 897 | a3cX1.push_request(called_directly=False) |
| 898 | try: |
| 899 | res = a3cX2.apply_async(add_to_parent=True) |
| 900 | assert res in a3cX1.request.children |
| 901 | finally: |
| 902 | a3cX1.pop_request() |
| 903 | finally: |
| 904 | _task_stack.pop() |
| 905 | |
| 906 | def test_pickle_app(self): |
| 907 | changes = {'THE_FOO_BAR': 'bars', |
nothing calls this directly
no test coverage detected