Read a complete binary message from async stream. Args: reader: asyncio StreamReader Returns: dict: Message dict with 'type', 'id', and payload fields Raises: DirtyProtocolError: If read fails or message is malformed
(reader: asyncio.StreamReader)
| 406 | |
| 407 | @staticmethod |
| 408 | async def read_message_async(reader: asyncio.StreamReader) -> dict: |
| 409 | """ |
| 410 | Read a complete binary message from async stream. |
| 411 | |
| 412 | Args: |
| 413 | reader: asyncio StreamReader |
| 414 | |
| 415 | Returns: |
| 416 | dict: Message dict with 'type', 'id', and payload fields |
| 417 | |
| 418 | Raises: |
| 419 | DirtyProtocolError: If read fails or message is malformed |
| 420 | asyncio.IncompleteReadError: If connection closed mid-read |
| 421 | """ |
| 422 | # Read header |
| 423 | try: |
| 424 | header = await reader.readexactly(HEADER_SIZE) |
| 425 | except asyncio.IncompleteReadError as e: |
| 426 | if len(e.partial) == 0: |
| 427 | # Clean close - no data was read |
| 428 | raise |
| 429 | raise DirtyProtocolError( |
| 430 | f"Incomplete header: got {len(e.partial)} bytes, " |
| 431 | f"expected {HEADER_SIZE}", |
| 432 | raw_data=e.partial |
| 433 | ) |
| 434 | |
| 435 | msg_type, request_id, length = BinaryProtocol.decode_header(header) |
| 436 | |
| 437 | # Read payload |
| 438 | if length > 0: |
| 439 | try: |
| 440 | payload_data = await reader.readexactly(length) |
| 441 | except asyncio.IncompleteReadError as e: |
| 442 | raise DirtyProtocolError( |
| 443 | f"Incomplete payload: got {len(e.partial)} bytes, " |
| 444 | f"expected {length}", |
| 445 | raw_data=e.partial |
| 446 | ) |
| 447 | |
| 448 | try: |
| 449 | payload_dict = TLVEncoder.decode_full(payload_data) |
| 450 | except DirtyProtocolError: |
| 451 | raise |
| 452 | except Exception as e: |
| 453 | raise DirtyProtocolError( |
| 454 | f"Failed to decode TLV payload: {e}", |
| 455 | raw_data=payload_data[:50] |
| 456 | ) |
| 457 | else: |
| 458 | payload_dict = {} |
| 459 | |
| 460 | # Build response dict |
| 461 | msg_type_str = MSG_TYPE_TO_STR[msg_type] |
| 462 | result = {"type": msg_type_str, "id": request_id} |
| 463 | result.update(payload_dict) |
| 464 | |
| 465 | return result |
no test coverage detected