Called when all headers received. Validates headers for request smuggling vulnerabilities: - Rejects duplicate Content-Length headers - Rejects requests with both Content-Length and Transfer-Encoding - Rejects chunked Transfer-Encoding in HTTP/1.0 - Rejects s
(self)
| 569 | self._on_header(name_lower, value) |
| 570 | |
| 571 | def _finalize_headers(self): |
| 572 | """Called when all headers received. |
| 573 | |
| 574 | Validates headers for request smuggling vulnerabilities: |
| 575 | - Rejects duplicate Content-Length headers |
| 576 | - Rejects requests with both Content-Length and Transfer-Encoding |
| 577 | - Rejects chunked Transfer-Encoding in HTTP/1.0 |
| 578 | - Rejects stacked chunked encoding |
| 579 | - Validates Transfer-Encoding values |
| 580 | """ |
| 581 | self.headers = self._headers_list |
| 582 | |
| 583 | # Extract and validate content-length and transfer-encoding |
| 584 | content_length = None |
| 585 | chunked = False |
| 586 | |
| 587 | for name, value in self.headers: |
| 588 | if name == b'content-length': |
| 589 | # Reject duplicate Content-Length headers (request smuggling vector) |
| 590 | if content_length is not None: |
| 591 | raise InvalidHeader("Duplicate Content-Length header") |
| 592 | try: |
| 593 | cl_value = int(value) |
| 594 | except ValueError: |
| 595 | raise InvalidHeader("Invalid Content-Length value") |
| 596 | if cl_value < 0: |
| 597 | raise InvalidHeader("Negative Content-Length") |
| 598 | content_length = cl_value |
| 599 | |
| 600 | elif name == b'transfer-encoding': |
| 601 | # Properly parse comma-separated Transfer-Encoding values |
| 602 | # per RFC 9112 Section 6.1 |
| 603 | vals = [v.strip() for v in value.split(b',')] |
| 604 | for val in vals: |
| 605 | val_lower = val.lower() |
| 606 | if val_lower == b'chunked': |
| 607 | # Reject stacked chunked encoding (request smuggling vector) |
| 608 | if chunked: |
| 609 | raise InvalidHeader("Stacked chunked encoding") |
| 610 | chunked = True |
| 611 | elif val_lower == b'identity': |
| 612 | # identity after chunked is invalid |
| 613 | if chunked: |
| 614 | raise InvalidHeader("Invalid Transfer-Encoding after chunked") |
| 615 | elif val_lower in (b'compress', b'deflate', b'gzip'): |
| 616 | # Compression after chunked is invalid |
| 617 | if chunked: |
| 618 | raise InvalidHeader("Invalid Transfer-Encoding after chunked") |
| 619 | # Mark connection for close (unsupported but valid) |
| 620 | self.should_keep_alive = False |
| 621 | else: |
| 622 | # Reject unknown transfer codings |
| 623 | raise UnsupportedTransferCoding(val.decode('latin-1')) |
| 624 | |
| 625 | elif name == b'connection': |
| 626 | val = value.lower() |
| 627 | if b'close' in val: |
| 628 | self.should_keep_alive = False |
no test coverage detected