(self, queues=None, ready_callback=None, pidfile=None,
include=None, use_eventloop=None, exclude_queues=None,
**kwargs)
| 99 | self.setup_instance(**self.prepare_args(**kwargs)) |
| 100 | |
| 101 | def setup_instance(self, queues=None, ready_callback=None, pidfile=None, |
| 102 | include=None, use_eventloop=None, exclude_queues=None, |
| 103 | **kwargs): |
| 104 | self.pidfile = pidfile |
| 105 | self.setup_queues(queues, exclude_queues) |
| 106 | self.setup_includes(str_to_list(include)) |
| 107 | |
| 108 | # Set default concurrency |
| 109 | if not self.concurrency: |
| 110 | try: |
| 111 | self.concurrency = cpu_count() |
| 112 | except NotImplementedError: |
| 113 | self.concurrency = 2 |
| 114 | |
| 115 | # Options |
| 116 | self.loglevel = mlevel(self.loglevel) |
| 117 | self.ready_callback = ready_callback or self.on_consumer_ready |
| 118 | |
| 119 | # this connection won't establish, only used for params |
| 120 | self._conninfo = self.app.connection_for_read() |
| 121 | self.use_eventloop = ( |
| 122 | self.should_use_eventloop() if use_eventloop is None |
| 123 | else use_eventloop |
| 124 | ) |
| 125 | self.options = kwargs |
| 126 | |
| 127 | signals.worker_init.send(sender=self) |
| 128 | |
| 129 | # Initialize bootsteps |
| 130 | self.pool_cls = _concurrency.get_implementation(self.pool_cls) |
| 131 | self.steps = [] |
| 132 | self.on_init_blueprint() |
| 133 | self.blueprint = self.Blueprint( |
| 134 | steps=self.app.steps['worker'], |
| 135 | on_start=self.on_start, |
| 136 | on_close=self.on_close, |
| 137 | on_stopped=self.on_stopped, |
| 138 | ) |
| 139 | self.blueprint.apply(self, **kwargs) |
| 140 | |
| 141 | def on_init_blueprint(self): |
| 142 | pass |
no test coverage detected