MCPcopy
hub / github.com/celery/celery / evcam

Function evcam

celery/events/snapshot.py:86–111  ·  view source on GitHub ↗

Start snapshot recorder.

(camera, freq=1.0, maxrate=None, loglevel=0,
          logfile=None, pidfile=None, timer=None, app=None,
          **kwargs)

Source from the content-addressed store, hash-verified

84
85
86def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
87 logfile=None, pidfile=None, timer=None, app=None,
88 **kwargs):
89 """Start snapshot recorder."""
90 app = app_or_default(app)
91
92 if pidfile:
93 platforms.create_pidlock(pidfile)
94
95 app.log.setup_logging_subsystem(loglevel, logfile)
96
97 print(f'-> evcam: Taking snapshots with {camera} (every {freq} secs.)')
98 state = app.events.State()
99 cam = instantiate(camera, state, app=app, freq=freq,
100 maxrate=maxrate, timer=timer)
101 cam.install()
102 conn = app.connection_for_read()
103 recv = app.events.Receiver(conn, handlers={'*': state.event})
104 try:
105 try:
106 recv.capture(limit=None)
107 except KeyboardInterrupt:
108 raise SystemExit
109 finally:
110 cam.cancel()
111 conn.close()

Callers 2

test_evcamMethod · 0.90
test_evcam_pidfileMethod · 0.90

Calls 9

instantiateFunction · 0.90
StateMethod · 0.80
installMethod · 0.45
connection_for_readMethod · 0.45
ReceiverMethod · 0.45
captureMethod · 0.45
cancelMethod · 0.45
closeMethod · 0.45

Tested by 2

test_evcamMethod · 0.72
test_evcam_pidfileMethod · 0.72