Return a full frame from the bytes we have in the buffer.
(self)
| 70 | self.buffer = bytearray() |
| 71 | |
| 72 | def frame_from_buffer(self) -> bytes | None: |
| 73 | """Return a full frame from the bytes we have in the buffer.""" |
| 74 | size = len(self.buffer) |
| 75 | if size < HEADER_SIZE: |
| 76 | return None |
| 77 | if self.message_size is None: |
| 78 | self.message_size = struct.unpack("!L", self.buffer[:HEADER_SIZE])[0] |
| 79 | if size < self.message_size + HEADER_SIZE: |
| 80 | return None |
| 81 | # We have a full frame, avoid extra copy in case we get a large frame. |
| 82 | bdata = memoryview(self.buffer)[HEADER_SIZE : HEADER_SIZE + self.message_size] |
| 83 | self.buffer = self.buffer[HEADER_SIZE + self.message_size :] |
| 84 | self.message_size = None |
| 85 | return bytes(bdata) |
| 86 | |
| 87 | def read(self, size: int = MAX_READ) -> str: |
| 88 | return self.read_bytes(size).decode("utf-8") |
no test coverage detected