(
self,
timeout: float = _INITIAL_CONNECT_TIMEOUT,
connect_timeout: Optional[Union[float, datetime.timedelta]] = None,
)
| 102 | return primary, secondary |
| 103 | |
| 104 | def start( |
| 105 | self, |
| 106 | timeout: float = _INITIAL_CONNECT_TIMEOUT, |
| 107 | connect_timeout: Optional[Union[float, datetime.timedelta]] = None, |
| 108 | ) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]": |
| 109 | self.try_connect(iter(self.primary_addrs)) |
| 110 | self.set_timeout(timeout) |
| 111 | if connect_timeout is not None: |
| 112 | self.set_connect_timeout(connect_timeout) |
| 113 | return self.future |
| 114 | |
| 115 | def try_connect(self, addrs: Iterator[Tuple[socket.AddressFamily, Tuple]]) -> None: |
| 116 | try: |
no test coverage detected