Parse chunked transfer encoding.
(self)
| 701 | return True |
| 702 | |
| 703 | def _parse_chunked_body(self): |
| 704 | """Parse chunked transfer encoding.""" |
| 705 | while self._buffer: |
| 706 | if self._chunk_state == 'size': |
| 707 | # Looking for chunk size line |
| 708 | idx = self._buffer.find(b'\r\n') |
| 709 | if idx == -1: |
| 710 | return False |
| 711 | |
| 712 | size_line = bytes(self._buffer[:idx]) |
| 713 | del self._buffer[:idx + 2] |
| 714 | |
| 715 | # Handle chunk extensions (e.g., "5;ext=value") |
| 716 | semicolon = size_line.find(b';') |
| 717 | if semicolon != -1: |
| 718 | # RFC 9112: chunk-ext must not contain bare CR |
| 719 | chunk_ext = size_line[semicolon + 1:] |
| 720 | if b'\r' in chunk_ext: |
| 721 | raise InvalidChunkExtension("bare CR not allowed") |
| 722 | size_line = size_line[:semicolon] |
| 723 | |
| 724 | # Strict validation: reject leading/trailing whitespace |
| 725 | # to prevent parser desync (request smuggling vector) |
| 726 | if size_line != size_line.strip(): |
| 727 | raise InvalidChunkSize("Whitespace in chunk size") |
| 728 | if not size_line: |
| 729 | raise InvalidChunkSize("Empty chunk size") |
| 730 | |
| 731 | # Validate hex characters only (0-9, a-f, A-F) |
| 732 | for c in size_line: |
| 733 | if c not in b'0123456789abcdefABCDEF': |
| 734 | raise InvalidChunkSize("Invalid character in chunk size") |
| 735 | |
| 736 | try: |
| 737 | self._chunk_size = int(size_line, 16) |
| 738 | except ValueError: |
| 739 | raise InvalidChunkSize("Invalid chunk size") |
| 740 | |
| 741 | if self._chunk_size == 0: |
| 742 | # Final chunk - skip trailers |
| 743 | self._chunk_state = 'trailer' |
| 744 | else: |
| 745 | self._chunk_remaining = self._chunk_size |
| 746 | self._chunk_state = 'data' |
| 747 | |
| 748 | elif self._chunk_state == 'data': |
| 749 | # Reading chunk data |
| 750 | if not self._buffer: |
| 751 | return False |
| 752 | |
| 753 | to_read = min(len(self._buffer), self._chunk_remaining) |
| 754 | chunk = bytes(self._buffer[:to_read]) |
| 755 | del self._buffer[:to_read] |
| 756 | self._chunk_remaining -= to_read |
| 757 | |
| 758 | if self._on_body: |
| 759 | self._on_body(chunk) |
| 760 |
no test coverage detected