(self)
| 1167 | assert w.process_task is w._process_task |
| 1168 | |
| 1169 | def test_Pool_create(self): |
| 1170 | from kombu.asynchronous.semaphore import LaxBoundedSemaphore |
| 1171 | w = Mock() |
| 1172 | w._conninfo.connection_errors = w._conninfo.channel_errors = () |
| 1173 | w.hub = Mock() |
| 1174 | |
| 1175 | PoolImp = Mock() |
| 1176 | poolimp = PoolImp.return_value = Mock() |
| 1177 | poolimp._pool = [Mock(), Mock()] |
| 1178 | poolimp._cache = {} |
| 1179 | poolimp._fileno_to_inq = {} |
| 1180 | poolimp._fileno_to_outq = {} |
| 1181 | |
| 1182 | from celery.concurrency.prefork import TaskPool as _TaskPool |
| 1183 | |
| 1184 | class MockTaskPool(_TaskPool): |
| 1185 | Pool = PoolImp |
| 1186 | |
| 1187 | @property |
| 1188 | def timers(self): |
| 1189 | return {Mock(): 30} |
| 1190 | |
| 1191 | w.pool_cls = MockTaskPool |
| 1192 | w.use_eventloop = True |
| 1193 | w.consumer.restart_count = -1 |
| 1194 | pool = components.Pool(w) |
| 1195 | pool.create(w) |
| 1196 | pool.register_with_event_loop(w, w.hub) |
| 1197 | if sys.platform != 'win32': |
| 1198 | assert isinstance(w.semaphore, LaxBoundedSemaphore) |
| 1199 | P = w.pool |
| 1200 | P.start() |
| 1201 | |
| 1202 | def test_wait_for_soft_shutdown(self): |
| 1203 | worker = self.worker |
nothing calls this directly
no test coverage detected