(self, data, from_trailer=False)
| 281 | return (name, value) |
| 282 | |
| 283 | def parse_headers(self, data, from_trailer=False): |
| 284 | headers = [] |
| 285 | |
| 286 | # Split lines on \r\n |
| 287 | lines = [bytes_to_str(line) for line in data.split(b"\r\n")] |
| 288 | |
| 289 | # handle scheme headers |
| 290 | scheme_state = [False] |
| 291 | if from_trailer: |
| 292 | # nonsense. either a request is https from the beginning |
| 293 | # .. or we are just behind a proxy who does not remove conflicting trailers |
| 294 | secure_scheme_headers, forwarder_headers = {}, [] |
| 295 | else: |
| 296 | secure_scheme_headers, forwarder_headers = self._peer_trusted_for_forwarded() |
| 297 | |
| 298 | # Parse headers into key/value pairs paying attention |
| 299 | # to continuation lines. |
| 300 | while lines: |
| 301 | if len(headers) >= self.limit_request_fields: |
| 302 | raise LimitRequestHeaders("limit request headers fields") |
| 303 | |
| 304 | # Parse initial header name: value pair. |
| 305 | curr = lines.pop(0) |
| 306 | header_length = len(curr) + len("\r\n") |
| 307 | if curr.find(":") <= 0: |
| 308 | raise InvalidHeader(curr) |
| 309 | name, value = curr.split(":", 1) |
| 310 | if self.cfg.strip_header_spaces: |
| 311 | name = name.rstrip(" \t") |
| 312 | if not TOKEN_RE.fullmatch(name): |
| 313 | raise InvalidHeaderName(name) |
| 314 | |
| 315 | # this is still a dangerous place to do this |
| 316 | # but it is more correct than doing it before the pattern match: |
| 317 | # after we entered Unicode wonderland, 8bits could case-shift into ASCII: |
| 318 | # b"\xDF".decode("latin-1").upper().encode("ascii") == b"SS" |
| 319 | name = name.upper() |
| 320 | |
| 321 | # RFC 9110 section 6.5.1 |
| 322 | if from_trailer and name in RFC9110_6_5_1_FORBIDDEN_TRAILER: |
| 323 | raise InvalidHeaderName(name) |
| 324 | |
| 325 | value = [value.strip(" \t")] |
| 326 | |
| 327 | # Consume value continuation lines.. |
| 328 | while lines and lines[0].startswith((" ", "\t")): |
| 329 | # .. which is obsolete here, and no longer done by default |
| 330 | if not self.cfg.permit_obsolete_folding: |
| 331 | raise ObsoleteFolding(name) |
| 332 | curr = lines.pop(0) |
| 333 | header_length += len(curr) + len("\r\n") |
| 334 | if header_length > self.limit_request_field_size > 0: |
| 335 | raise LimitRequestHeaders("limit request headers " |
| 336 | "fields size") |
| 337 | value.append(curr.strip("\t ")) |
| 338 | value = " ".join(value) |
| 339 | |
| 340 | if RFC9110_5_5_INVALID_AND_DANGEROUS.search(value): |
no test coverage detected