Get from pool or create new connection.
(self, req)
| 277 | |
| 278 | @asyncio.coroutine |
| 279 | def connect(self, req): |
| 280 | """Get from pool or create new connection.""" |
| 281 | key = (req.host, req.port, req.ssl) |
| 282 | |
| 283 | limit = self._limit |
| 284 | if limit is not None: |
| 285 | fut = asyncio.Future(loop=self._loop) |
| 286 | waiters = self._waiters[key] |
| 287 | |
| 288 | # The limit defines the maximum number of concurrent connections |
| 289 | # for a key. Waiters must be counted against the limit, even before |
| 290 | # the underlying connection is created. |
| 291 | available = limit - len(waiters) - len(self._acquired[key]) |
| 292 | |
| 293 | # Don't wait if there are connections available. |
| 294 | if available > 0: |
| 295 | fut.set_result(None) |
| 296 | |
| 297 | # This connection will now count towards the limit. |
| 298 | waiters.append(fut) |
| 299 | |
| 300 | yield from fut |
| 301 | |
| 302 | transport, proto = self._get(key) |
| 303 | if transport is None: |
| 304 | try: |
| 305 | if self._conn_timeout: |
| 306 | transport, proto = yield from asyncio.wait_for( |
| 307 | self._create_connection(req), |
| 308 | self._conn_timeout, loop=self._loop) |
| 309 | else: |
| 310 | transport, proto = yield from self._create_connection(req) |
| 311 | |
| 312 | except asyncio.TimeoutError as exc: |
| 313 | raise ClientTimeoutError( |
| 314 | 'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}' |
| 315 | .format(key)) from exc |
| 316 | except OSError as exc: |
| 317 | raise ClientOSError( |
| 318 | exc.errno, |
| 319 | 'Cannot connect to host {0[0]}:{0[1]} ssl:{0[2]} [{1}]' |
| 320 | .format(key, exc.strerror)) from exc |
| 321 | |
| 322 | self._acquired[key].add(transport) |
| 323 | conn = Connection(self, key, req, transport, proto, self._loop) |
| 324 | return conn |
| 325 | |
| 326 | def _get(self, key): |
| 327 | try: |