(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT,
port_search_limit=100, port_skew=+0, out=sys.stdout)
| 89 | _sock = None |
| 90 | |
| 91 | def __init__(self, host=CELERY_RDB_HOST, port=CELERY_RDB_PORT, |
| 92 | port_search_limit=100, port_skew=+0, out=sys.stdout): |
| 93 | self.active = True |
| 94 | self.out = out |
| 95 | |
| 96 | self._prev_handles = sys.stdin, sys.stdout |
| 97 | |
| 98 | self._sock, this_port = self.get_avail_port( |
| 99 | host, port, port_search_limit, port_skew, |
| 100 | ) |
| 101 | self._sock.setblocking(1) |
| 102 | self._sock.listen(1) |
| 103 | self.ident = f'{self.me}:{this_port}' |
| 104 | self.host = host |
| 105 | self.port = this_port |
| 106 | self.say(BANNER.format(self=self)) |
| 107 | |
| 108 | self._client, address = self._sock.accept() |
| 109 | self._client.setblocking(1) |
| 110 | self.remote_addr = ':'.join(str(v) for v in address) |
| 111 | self.say(SESSION_STARTED.format(self=self)) |
| 112 | self._handle = sys.stdin = sys.stdout = self._client.makefile('rw') |
| 113 | super().__init__(completekey='tab', |
| 114 | stdin=self._handle, stdout=self._handle) |
| 115 | |
| 116 | def get_avail_port(self, host, port, search_limit=100, skew=+0): |
| 117 | try: |
nothing calls this directly
no test coverage detected