(fun)
| 48 | def create_shared_task(**options): |
| 49 | |
| 50 | def __inner(fun): |
| 51 | name = options.get('name') |
| 52 | # Set as shared task so that unfinalized apps, |
| 53 | # and future apps will register a copy of this task. |
| 54 | _state.connect_on_app_finalize( |
| 55 | lambda app: app._task_from_fun(fun, **options) |
| 56 | ) |
| 57 | |
| 58 | # Force all finalized apps to take this task as well. |
| 59 | for app in _state._get_active_apps(): |
| 60 | if app.finalized: |
| 61 | with app._finalize_mutex: |
| 62 | app._task_from_fun(fun, **options) |
| 63 | |
| 64 | # Return a proxy that always gets the task from the current |
| 65 | # apps task registry. |
| 66 | def task_by_cons(): |
| 67 | app = _state.get_current_app() |
| 68 | return app.tasks[ |
| 69 | name or app.gen_task_name(fun.__name__, fun.__module__) |
| 70 | ] |
| 71 | return Proxy(task_by_cons) |
| 72 | return __inner |
| 73 | |
| 74 | if len(args) == 1 and callable(args[0]): |
nothing calls this directly
no test coverage detected