Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency
| 99 | |
| 100 | |
| 101 | class Pool(bootsteps.StartStopStep): |
| 102 | """Bootstep managing the worker pool. |
| 103 | |
| 104 | Describes how to initialize the worker pool, and starts and stops |
| 105 | the pool during worker start-up/shutdown. |
| 106 | |
| 107 | Adds attributes: |
| 108 | |
| 109 | * autoscale |
| 110 | * pool |
| 111 | * max_concurrency |
| 112 | * min_concurrency |
| 113 | """ |
| 114 | |
| 115 | requires = (Hub,) |
| 116 | |
| 117 | def __init__(self, w, autoscale=None, **kwargs): |
| 118 | w.pool = None |
| 119 | w.max_concurrency = None |
| 120 | w.min_concurrency = w.concurrency |
| 121 | self.optimization = w.optimization |
| 122 | if isinstance(autoscale, str): |
| 123 | max_c, _, min_c = autoscale.partition(',') |
| 124 | autoscale = [int(max_c), min_c and int(min_c) or 0] |
| 125 | w.autoscale = autoscale |
| 126 | if w.autoscale: |
| 127 | w.max_concurrency, w.min_concurrency = w.autoscale |
| 128 | super().__init__(w, **kwargs) |
| 129 | |
| 130 | def close(self, w): |
| 131 | if w.pool: |
| 132 | w.pool.close() |
| 133 | |
| 134 | def terminate(self, w): |
| 135 | if w.pool: |
| 136 | w.pool.terminate() |
| 137 | |
| 138 | def create(self, w): |
| 139 | semaphore = None |
| 140 | max_restarts = None |
| 141 | if w.app.conf.worker_pool in GREEN_POOLS: # pragma: no cover |
| 142 | warnings.warn(UserWarning(W_POOL_SETTING)) |
| 143 | threaded = not w.use_eventloop or IS_WINDOWS |
| 144 | procs = w.min_concurrency |
| 145 | w.process_task = w._process_task |
| 146 | if not threaded: |
| 147 | semaphore = w.semaphore = LaxBoundedSemaphore(procs) |
| 148 | w._quick_acquire = w.semaphore.acquire |
| 149 | w._quick_release = w.semaphore.release |
| 150 | max_restarts = 100 |
| 151 | if w.pool_putlocks and w.pool_cls.uses_semaphore: |
| 152 | w.process_task = w._process_task_sem |
| 153 | allow_restart = w.pool_restarts |
| 154 | pool = w.pool = self.instantiate( |
| 155 | w.pool_cls, w.min_concurrency, |
| 156 | initargs=(w.app, w.hostname), |
| 157 | maxtasksperchild=w.max_tasks_per_child, |
| 158 | max_memory_per_child=w.max_memory_per_child, |
no outgoing calls