MCPcopy
hub / github.com/celery/celery / TaskPool

Class TaskPool

celery/concurrency/gevent.py:85–166  ·  view source on GitHub ↗

GEvent Pool.

Source from the content-addressed store, hash-verified

83
84
85class TaskPool(base.BasePool):
86 """GEvent Pool."""
87
88 Timer = Timer
89
90 signal_safe = False
91 is_green = True
92 task_join_will_block = False
93 _pool = None
94 _pool_map = None
95 _quick_put = None
96
97 def __init__(self, *args, **kwargs):
98 from gevent import getcurrent, spawn_raw
99 from gevent.pool import Pool
100 self.Pool = Pool
101 self.getcurrent = getcurrent
102 self.getpid = lambda: id(getcurrent())
103 self.spawn_n = spawn_raw
104 self.timeout = kwargs.get('timeout')
105 super().__init__(*args, **kwargs)
106
107 def on_start(self):
108 self._pool = self.Pool(self.limit)
109 self._pool_map = {}
110 self._quick_put = self._pool.spawn
111
112 def on_stop(self):
113 if self._pool is not None:
114 self._pool.join()
115
116 def on_apply(self, target, args=None, kwargs=None, callback=None,
117 accept_callback=None, timeout=None,
118 timeout_callback=None, apply_target=apply_target, **_):
119 timeout = self.timeout if timeout is None else timeout
120 target = self._make_killable_target(target)
121 greenlet = self._quick_put(apply_timeout if timeout else apply_target,
122 target, args, kwargs, callback, accept_callback,
123 self.getpid, timeout=timeout, timeout_callback=timeout_callback)
124 self._add_to_pool_map(id(greenlet), greenlet)
125 greenlet.terminate = types.MethodType(_terminate, greenlet)
126 return greenlet
127
128 def grow(self, n=1):
129 self._pool._semaphore.counter += n
130 self._pool.size += n
131
132 def shrink(self, n=1):
133 self._pool._semaphore.counter -= n
134 self._pool.size -= n
135
136 def terminate_job(self, pid, signal=None):
137 import gevent
138
139 if pid in self._pool_map:
140 greenlet = self._pool_map[pid]
141 gevent.kill(greenlet)
142

Callers 2

test_poolMethod · 0.90
test_terminate_jobMethod · 0.90

Calls

no outgoing calls

Tested by 2

test_poolMethod · 0.72
test_terminate_jobMethod · 0.72