(self, byte_obj: Buffer)
| 390 | return True |
| 391 | |
| 392 | def readinto(self, byte_obj: Buffer) -> int: |
| 393 | if self.current_buffer is None: |
| 394 | if not self._get_next_buffer() or self.current_buffer is None: |
| 395 | self.close() |
| 396 | return 0 |
| 397 | ret_length = min( |
| 398 | len(byte_obj), len(self.current_buffer) - self.current_buffer_pos |
| 399 | ) |
| 400 | byte_obj[0:ret_length] = self.current_buffer[ |
| 401 | self.current_buffer_pos : self.current_buffer_pos + ret_length |
| 402 | ] |
| 403 | self.current_buffer_pos += ret_length |
| 404 | if self.current_buffer_pos == len(self.current_buffer): |
| 405 | self.current_buffer = None |
| 406 | return ret_length |
| 407 | |
| 408 | |
| 409 | # check if we are in a worker or not |
nothing calls this directly
no test coverage detected