(self)
| 34 | self._buffer += data |
| 35 | |
| 36 | async def drain(self): |
| 37 | # Decode the buffer to extract messages using binary protocol |
| 38 | while len(self._buffer) >= HEADER_SIZE: |
| 39 | # Decode header to get payload length |
| 40 | _, _, length = BinaryProtocol.decode_header( |
| 41 | self._buffer[:HEADER_SIZE] |
| 42 | ) |
| 43 | total_size = HEADER_SIZE + length |
| 44 | if len(self._buffer) >= total_size: |
| 45 | msg_data = self._buffer[:total_size] |
| 46 | self._buffer = self._buffer[total_size:] |
| 47 | # decode_message returns (msg_type_str, request_id, payload_dict) |
| 48 | msg_type_str, request_id, payload_dict = BinaryProtocol.decode_message(msg_data) |
| 49 | # Reconstruct the dict format for backwards compatibility |
| 50 | result = {"type": msg_type_str, "id": request_id} |
| 51 | result.update(payload_dict) |
| 52 | self.messages.append(result) |
| 53 | else: |
| 54 | break |
| 55 | |
| 56 | def close(self): |
| 57 | self.closed = True |
nothing calls this directly
no test coverage detected