(self)
| 69 | self._buffer += data |
| 70 | |
| 71 | async def drain(self): |
| 72 | # Decode the buffer to extract messages using binary protocol |
| 73 | while len(self._buffer) >= HEADER_SIZE: |
| 74 | # Decode header to get payload length |
| 75 | _, _, length = BinaryProtocol.decode_header( |
| 76 | self._buffer[:HEADER_SIZE] |
| 77 | ) |
| 78 | total_size = HEADER_SIZE + length |
| 79 | if len(self._buffer) >= total_size: |
| 80 | msg_data = self._buffer[:total_size] |
| 81 | self._buffer = self._buffer[total_size:] |
| 82 | # decode_message returns (msg_type_str, request_id, payload_dict) |
| 83 | msg_type_str, request_id, payload_dict = BinaryProtocol.decode_message(msg_data) |
| 84 | # Reconstruct the dict format for backwards compatibility |
| 85 | result = {"type": msg_type_str, "id": request_id} |
| 86 | result.update(payload_dict) |
| 87 | self.messages.append(result) |
| 88 | else: |
| 89 | break |
| 90 | |
| 91 | def close(self): |
| 92 | self.closed = True |
nothing calls this directly
no test coverage detected