MCPcopy
hub / github.com/celery/celery / __init__

Method __init__

celery/contrib/rdb.py:91–114  ·  view source on GitHub ↗
(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
                 port_search_limit=100, port_skew=+0, out=sys.stdout)

Source from the content-addressed store, hash-verified

89 _sock = None
90
91 def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
92 port_search_limit=100, port_skew=+0, out=sys.stdout):
93 self.active = True
94 self.out = out
95
96 self._prev_handles = sys.stdin, sys.stdout
97
98 self._sock, this_port = self.get_avail_port(
99 host, port, port_search_limit, port_skew,
100 )
101 self._sock.setblocking(1)
102 self._sock.listen(1)
103 self.ident = f'{self.me}:{this_port}'
104 self.host = host
105 self.port = this_port
106 self.say(BANNER.format(self=self))
107
108 self._client, address = self._sock.accept()
109 self._client.setblocking(1)
110 self.remote_addr = ':'.join(str(v) for v in address)
111 self.say(SESSION_STARTED.format(self=self))
112 self._handle = sys.stdin = sys.stdout = self._client.makefile('rw')
113 super().__init__(completekey='tab',
114 stdin=self._handle, stdout=self._handle)
115
116 def get_avail_port(self, host, port, search_limit=100, skew=+0):
117 try:

Callers

nothing calls this directly

Calls 4

get_avail_portMethod · 0.95
sayMethod · 0.95
formatMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected