MCPcopy
hub / github.com/celery/celery / __init__

Method __init__

celery/contrib/testing/worker.py:39–61  ·  view source on GitHub ↗
(self, *args, **kwargs)

Source from the content-addressed store, hash-verified

37 logger_queue = None
38
39 def __init__(self, *args, **kwargs):
40 # type: (*Any, **Any) -> None
41 self._on_started = threading.Event()
42
43 super().__init__(*args, **kwargs)
44
45 # Defensive check: pool_cls may be a string (e.g., 'prefork') or a class
46 pool_module = self.pool_cls if isinstance(self.pool_cls, str) else self.pool_cls.__module__
47 if pool_module.split('.')[-1] == 'prefork':
48 from billiard import Queue
49 self.logger_queue = Queue()
50 self.pid = os.getpid()
51
52 try:
53 from tblib import pickling_support
54 pickling_support.install()
55 except ImportError:
56 pass
57
58 # collect logs from forked process.
59 # XXX: those logs will appear twice in the live log
60 self.queue_listener = logging.handlers.QueueListener(self.logger_queue, logging.getLogger())
61 self.queue_listener.start()
62
63 class QueueHandler(logging.handlers.QueueHandler):
64 def prepare(self, record):

Callers

nothing calls this directly

Calls 4

QueueClass · 0.85
EventMethod · 0.80
installMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected