MCPcopy
hub / github.com/celery/celery / test_Pool_create

Method test_Pool_create

t/unit/worker/test_worker.py:1169–1200  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1167 assert w.process_task is w._process_task
1168
1169 def test_Pool_create(self):
1170 from kombu.asynchronous.semaphore import LaxBoundedSemaphore
1171 w = Mock()
1172 w._conninfo.connection_errors = w._conninfo.channel_errors = ()
1173 w.hub = Mock()
1174
1175 PoolImp = Mock()
1176 poolimp = PoolImp.return_value = Mock()
1177 poolimp._pool = [Mock(), Mock()]
1178 poolimp._cache = {}
1179 poolimp._fileno_to_inq = {}
1180 poolimp._fileno_to_outq = {}
1181
1182 from celery.concurrency.prefork import TaskPool as _TaskPool
1183
1184 class MockTaskPool(_TaskPool):
1185 Pool = PoolImp
1186
1187 @property
1188 def timers(self):
1189 return {Mock(): 30}
1190
1191 w.pool_cls = MockTaskPool
1192 w.use_eventloop = True
1193 w.consumer.restart_count = -1
1194 pool = components.Pool(w)
1195 pool.create(w)
1196 pool.register_with_event_loop(w, w.hub)
1197 if sys.platform != 'win32':
1198 assert isinstance(w.semaphore, LaxBoundedSemaphore)
1199 P = w.pool
1200 P.start()
1201
1202 def test_wait_for_soft_shutdown(self):
1203 worker = self.worker

Callers

nothing calls this directly

Calls 3

createMethod · 0.95
startMethod · 0.45

Tested by

no test coverage detected