(camera, app, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, workdir=None,
detach=False, **kwargs)
| 22 | |
| 23 | |
| 24 | def _run_evcam(camera, app, logfile=None, pidfile=None, uid=None, |
| 25 | gid=None, umask=None, workdir=None, |
| 26 | detach=False, **kwargs): |
| 27 | from celery.events.snapshot import evcam |
| 28 | _set_process_status('cam') |
| 29 | kwargs['app'] = app |
| 30 | cam = partial(evcam, camera, |
| 31 | logfile=logfile, pidfile=pidfile, **kwargs) |
| 32 | |
| 33 | if detach: |
| 34 | with detached(logfile, pidfile, uid, gid, umask, workdir): |
| 35 | return cam() |
| 36 | else: |
| 37 | return cam() |
| 38 | |
| 39 | |
| 40 | def _run_evtop(app): |
no test coverage detected