( # type: ignore
self,
max_clients: int = 10,
hostname_mapping: Optional[Dict[str, str]] = None,
max_buffer_size: int = 104857600,
resolver: Optional[Resolver] = None,
defaults: Optional[Dict[str, Any]] = None,
max_header_size: Optional[int] = None,
max_body_size: Optional[int] = None,
)
| 116 | """ |
| 117 | |
| 118 | def initialize( # type: ignore |
| 119 | self, |
| 120 | max_clients: int = 10, |
| 121 | hostname_mapping: Optional[Dict[str, str]] = None, |
| 122 | max_buffer_size: int = 104857600, |
| 123 | resolver: Optional[Resolver] = None, |
| 124 | defaults: Optional[Dict[str, Any]] = None, |
| 125 | max_header_size: Optional[int] = None, |
| 126 | max_body_size: Optional[int] = None, |
| 127 | ) -> None: |
| 128 | super().initialize(defaults=defaults) |
| 129 | self.max_clients = max_clients |
| 130 | self.queue = ( |
| 131 | collections.deque() |
| 132 | ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]] |
| 133 | self.active = ( |
| 134 | {} |
| 135 | ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]] |
| 136 | self.waiting = ( |
| 137 | {} |
| 138 | ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]] |
| 139 | self.max_buffer_size = max_buffer_size |
| 140 | self.max_header_size = max_header_size |
| 141 | self.max_body_size = max_body_size |
| 142 | # TCPClient could create a Resolver for us, but we have to do it |
| 143 | # ourselves to support hostname_mapping. |
| 144 | if resolver: |
| 145 | self.resolver = resolver |
| 146 | self.own_resolver = False |
| 147 | else: |
| 148 | self.resolver = Resolver() |
| 149 | self.own_resolver = True |
| 150 | if hostname_mapping is not None: |
| 151 | self.resolver = OverrideResolver( |
| 152 | resolver=self.resolver, mapping=hostname_mapping |
| 153 | ) |
| 154 | self.tcp_client = TCPClient(resolver=self.resolver) |
| 155 | |
| 156 | def close(self) -> None: |
| 157 | super().close() |
nothing calls this directly
no test coverage detected