Update prefetch count after pool/shrink grow operations. Index must be the change in number of processes as a positive (increasing) or negative (decreasing) number. Note: Currently pool grow operations will end up with an offset of +1 if the initial
(self, index=0)
| 267 | ) |
| 268 | |
| 269 | def _update_prefetch_count(self, index=0): |
| 270 | """Update prefetch count after pool/shrink grow operations. |
| 271 | |
| 272 | Index must be the change in number of processes as a positive |
| 273 | (increasing) or negative (decreasing) number. |
| 274 | |
| 275 | Note: |
| 276 | Currently pool grow operations will end up with an offset |
| 277 | of +1 if the initial size of the pool was 0 (e.g. |
| 278 | :option:`--autoscale=1,0 <celery worker --autoscale>`). |
| 279 | """ |
| 280 | num_processes = self.pool.num_processes |
| 281 | if not self.initial_prefetch_count or not num_processes: |
| 282 | return # prefetch disabled |
| 283 | self.initial_prefetch_count = ( |
| 284 | self.pool.num_processes * self.prefetch_multiplier |
| 285 | ) |
| 286 | return self._update_qos_eventually(index) |
| 287 | |
| 288 | def _update_qos_eventually(self, index): |
| 289 | return (self.qos.decrement_eventually if index < 0 |