(self, w)
| 193 | super().__init__(w, beat=beat, **kwargs) |
| 194 | |
| 195 | def create(self, w): |
| 196 | from celery.beat import EmbeddedService |
| 197 | |
| 198 | # Defensive check: pool_cls may be a string (e.g., 'gevent') or a class |
| 199 | pool_module = w.pool_cls if isinstance(w.pool_cls, str) else w.pool_cls.__module__ |
| 200 | if pool_module.endswith(('gevent', 'eventlet')): |
| 201 | raise ImproperlyConfigured(ERR_B_GREEN) |
| 202 | b = w.beat = EmbeddedService(w.app, |
| 203 | schedule_filename=w.schedule_filename, |
| 204 | scheduler_cls=w.scheduler) |
| 205 | return b |
| 206 | |
| 207 | |
| 208 | class StateDB(bootsteps.Step): |
nothing calls this directly
no test coverage detected