Attempts to find a position in the read buffer that satisfies the currently-pending read. Returns a position in the buffer if the current read can be satisfied, or None if it cannot.
(self)
| 899 | self._finish_read(pos) |
| 900 | |
| 901 | def _find_read_pos(self) -> Optional[int]: |
| 902 | """Attempts to find a position in the read buffer that satisfies |
| 903 | the currently-pending read. |
| 904 | |
| 905 | Returns a position in the buffer if the current read can be satisfied, |
| 906 | or None if it cannot. |
| 907 | """ |
| 908 | if self._read_bytes is not None and ( |
| 909 | self._read_buffer_size >= self._read_bytes |
| 910 | or (self._read_partial and self._read_buffer_size > 0) |
| 911 | ): |
| 912 | num_bytes = min(self._read_bytes, self._read_buffer_size) |
| 913 | return num_bytes |
| 914 | elif self._read_delimiter is not None: |
| 915 | # Multi-byte delimiters (e.g. '\r\n') may straddle two |
| 916 | # chunks in the read buffer, so we can't easily find them |
| 917 | # without collapsing the buffer. However, since protocols |
| 918 | # using delimited reads (as opposed to reads of a known |
| 919 | # length) tend to be "line" oriented, the delimiter is likely |
| 920 | # to be in the first few chunks. Merge the buffer gradually |
| 921 | # since large merges are relatively expensive and get undone in |
| 922 | # _consume(). |
| 923 | if self._read_buffer: |
| 924 | loc = self._read_buffer.find(self._read_delimiter) |
| 925 | if loc != -1: |
| 926 | delimiter_len = len(self._read_delimiter) |
| 927 | self._check_max_bytes(self._read_delimiter, loc + delimiter_len) |
| 928 | return loc + delimiter_len |
| 929 | self._check_max_bytes(self._read_delimiter, self._read_buffer_size) |
| 930 | elif self._read_regex is not None: |
| 931 | if self._read_buffer: |
| 932 | m = self._read_regex.search(self._read_buffer) |
| 933 | if m is not None: |
| 934 | loc = m.end() |
| 935 | self._check_max_bytes(self._read_regex, loc) |
| 936 | return loc |
| 937 | self._check_max_bytes(self._read_regex, self._read_buffer_size) |
| 938 | return None |
| 939 | |
| 940 | def _check_max_bytes(self, delimiter: Union[bytes, Pattern], size: int) -> None: |
| 941 | if self._read_max_bytes is not None and size > self._read_max_bytes: |
no test coverage detected