Start shell session with convenient access to celery symbols. The following symbols will be added to the main globals: - ``celery``: the current application. - ``chord``, ``group``, ``chain``, ``chunks``, ``xmap``, ``xstarmap`` ``subtask``, ``Task`` - all registered tasks.
(ctx, ipython=False, bpython=False,
python=False, without_tasks=False, eventlet=False,
gevent=False, **kwargs)
| 118 | @click.pass_context |
| 119 | @handle_preload_options |
| 120 | def shell(ctx, ipython=False, bpython=False, |
| 121 | python=False, without_tasks=False, eventlet=False, |
| 122 | gevent=False, **kwargs): |
| 123 | """Start shell session with convenient access to celery symbols. |
| 124 | |
| 125 | The following symbols will be added to the main globals: |
| 126 | - ``celery``: the current application. |
| 127 | - ``chord``, ``group``, ``chain``, ``chunks``, |
| 128 | ``xmap``, ``xstarmap`` ``subtask``, ``Task`` |
| 129 | - all registered tasks. |
| 130 | """ |
| 131 | sys.path.insert(0, os.getcwd()) |
| 132 | if eventlet: |
| 133 | import_module('celery.concurrency.eventlet') |
| 134 | if gevent: |
| 135 | import_module('celery.concurrency.gevent') |
| 136 | import celery |
| 137 | app = ctx.obj.app |
| 138 | app.loader.import_default_modules() |
| 139 | |
| 140 | # pylint: disable=attribute-defined-outside-init |
| 141 | locals = { |
| 142 | 'app': app, |
| 143 | 'celery': app, |
| 144 | 'Task': celery.Task, |
| 145 | 'chord': celery.chord, |
| 146 | 'group': celery.group, |
| 147 | 'chain': celery.chain, |
| 148 | 'chunks': celery.chunks, |
| 149 | 'xmap': celery.xmap, |
| 150 | 'xstarmap': celery.xstarmap, |
| 151 | 'subtask': celery.subtask, |
| 152 | 'signature': celery.signature, |
| 153 | } |
| 154 | |
| 155 | if not without_tasks: |
| 156 | locals.update({ |
| 157 | task.__name__: task for task in app.tasks.values() |
| 158 | if not task.name.startswith('celery.') |
| 159 | }) |
| 160 | |
| 161 | if python: |
| 162 | _invoke_fallback_shell(locals) |
| 163 | elif bpython: |
| 164 | try: |
| 165 | _invoke_bpython_shell(locals) |
| 166 | except ImportError: |
| 167 | ctx.obj.echo(f'{ctx.obj.ERROR}: bpython is not installed') |
| 168 | elif ipython: |
| 169 | try: |
| 170 | _invoke_ipython_shell(locals) |
| 171 | except ImportError as e: |
| 172 | ctx.obj.echo(f'{ctx.obj.ERROR}: {e}') |
| 173 | _invoke_default_shell(locals) |
nothing calls this directly
no test coverage detected