Mock StreamWriter that captures written messages.
| 50 | |
| 51 | |
| 52 | class MockStreamWriter: |
| 53 | """Mock StreamWriter that captures written messages.""" |
| 54 | |
| 55 | def __init__(self): |
| 56 | self.messages = [] |
| 57 | self._buffer = b"" |
| 58 | self.bytes_written = 0 |
| 59 | |
| 60 | def write(self, data): |
| 61 | self._buffer += data |
| 62 | self.bytes_written += len(data) |
| 63 | |
| 64 | async def drain(self): |
| 65 | while len(self._buffer) >= DirtyProtocol.HEADER_SIZE: |
| 66 | length = struct.unpack( |
| 67 | DirtyProtocol.HEADER_FORMAT, |
| 68 | self._buffer[:DirtyProtocol.HEADER_SIZE] |
| 69 | )[0] |
| 70 | total_size = DirtyProtocol.HEADER_SIZE + length |
| 71 | if len(self._buffer) >= total_size: |
| 72 | msg_data = self._buffer[DirtyProtocol.HEADER_SIZE:total_size] |
| 73 | self._buffer = self._buffer[total_size:] |
| 74 | self.messages.append(DirtyProtocol.decode(msg_data)) |
| 75 | else: |
| 76 | break |
| 77 | |
| 78 | def close(self): |
| 79 | pass |
| 80 | |
| 81 | async def wait_closed(self): |
| 82 | pass |
| 83 | |
| 84 | |
| 85 | class MockStreamReader: |
no outgoing calls
no test coverage detected