Unmanaged worker instance.
| 61 | |
| 62 | |
| 63 | class WorkController: |
| 64 | """Unmanaged worker instance.""" |
| 65 | |
| 66 | app = None |
| 67 | |
| 68 | pidlock = None |
| 69 | blueprint = None |
| 70 | pool = None |
| 71 | semaphore = None |
| 72 | |
| 73 | #: contains the exit code if a :exc:`SystemExit` event is handled. |
| 74 | exitcode = None |
| 75 | |
| 76 | class Blueprint(bootsteps.Blueprint): |
| 77 | """Worker bootstep blueprint.""" |
| 78 | |
| 79 | name = 'Worker' |
| 80 | default_steps = { |
| 81 | 'celery.worker.components:Hub', |
| 82 | 'celery.worker.components:Pool', |
| 83 | 'celery.worker.components:Beat', |
| 84 | 'celery.worker.components:Timer', |
| 85 | 'celery.worker.components:StateDB', |
| 86 | 'celery.worker.components:Consumer', |
| 87 | 'celery.worker.autoscale:WorkerComponent', |
| 88 | } |
| 89 | |
| 90 | def __init__(self, app=None, hostname=None, **kwargs): |
| 91 | self.app = app or self.app |
| 92 | self.hostname = default_nodename(hostname) |
| 93 | self.startup_time = datetime.now(timezone.utc) |
| 94 | self.app.loader.init_worker() |
| 95 | self.on_before_init(**kwargs) |
| 96 | self.setup_defaults(**kwargs) |
| 97 | self.on_after_init(**kwargs) |
| 98 | |
| 99 | self.setup_instance(**self.prepare_args(**kwargs)) |
| 100 | |
| 101 | def setup_instance(self, queues=None, ready_callback=None, pidfile=None, |
| 102 | include=None, use_eventloop=None, exclude_queues=None, |
| 103 | **kwargs): |
| 104 | self.pidfile = pidfile |
| 105 | self.setup_queues(queues, exclude_queues) |
| 106 | self.setup_includes(str_to_list(include)) |
| 107 | |
| 108 | # Set default concurrency |
| 109 | if not self.concurrency: |
| 110 | try: |
| 111 | self.concurrency = cpu_count() |
| 112 | except NotImplementedError: |
| 113 | self.concurrency = 2 |
| 114 | |
| 115 | # Options |
| 116 | self.loglevel = mlevel(self.loglevel) |
| 117 | self.ready_callback = ready_callback or self.on_consumer_ready |
| 118 | |
| 119 | # this connection won't establish, only used for params |
| 120 | self._conninfo = self.app.connection_for_read() |
no outgoing calls
no test coverage detected