Timeout configuration. **Usage**: Timeout(None) # No timeouts. Timeout(5.0) # 5s timeout on all operations. Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout
| 70 | |
| 71 | |
| 72 | class Timeout: |
| 73 | """ |
| 74 | Timeout configuration. |
| 75 | |
| 76 | **Usage**: |
| 77 | |
| 78 | Timeout(None) # No timeouts. |
| 79 | Timeout(5.0) # 5s timeout on all operations. |
| 80 | Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. |
| 81 | Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. |
| 82 | Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. |
| 83 | # 5s timeout elsewhere. |
| 84 | """ |
| 85 | |
| 86 | def __init__( |
| 87 | self, |
| 88 | timeout: TimeoutTypes | UnsetType = UNSET, |
| 89 | *, |
| 90 | connect: None | float | UnsetType = UNSET, |
| 91 | read: None | float | UnsetType = UNSET, |
| 92 | write: None | float | UnsetType = UNSET, |
| 93 | pool: None | float | UnsetType = UNSET, |
| 94 | ) -> None: |
| 95 | if isinstance(timeout, Timeout): |
| 96 | # Passed as a single explicit Timeout. |
| 97 | assert connect is UNSET |
| 98 | assert read is UNSET |
| 99 | assert write is UNSET |
| 100 | assert pool is UNSET |
| 101 | self.connect = timeout.connect # type: typing.Optional[float] |
| 102 | self.read = timeout.read # type: typing.Optional[float] |
| 103 | self.write = timeout.write # type: typing.Optional[float] |
| 104 | self.pool = timeout.pool # type: typing.Optional[float] |
| 105 | elif isinstance(timeout, tuple): |
| 106 | # Passed as a tuple. |
| 107 | self.connect = timeout[0] |
| 108 | self.read = timeout[1] |
| 109 | self.write = None if len(timeout) < 3 else timeout[2] |
| 110 | self.pool = None if len(timeout) < 4 else timeout[3] |
| 111 | elif not ( |
| 112 | isinstance(connect, UnsetType) |
| 113 | or isinstance(read, UnsetType) |
| 114 | or isinstance(write, UnsetType) |
| 115 | or isinstance(pool, UnsetType) |
| 116 | ): |
| 117 | self.connect = connect |
| 118 | self.read = read |
| 119 | self.write = write |
| 120 | self.pool = pool |
| 121 | else: |
| 122 | if isinstance(timeout, UnsetType): |
| 123 | raise ValueError( |
| 124 | "httpx.Timeout must either include a default, or set all " |
| 125 | "four parameters explicitly." |
| 126 | ) |
| 127 | self.connect = timeout if isinstance(connect, UnsetType) else connect |
| 128 | self.read = timeout if isinstance(read, UnsetType) else read |
| 129 | self.write = timeout if isinstance(write, UnsetType) else write |
no outgoing calls
no test coverage detected