MCPcopy
hub / github.com/benoitc/gunicorn / MockStreamWriter

Class MockStreamWriter

benchmarks/dirty_streaming.py:52–82  ·  view source on GitHub ↗

Mock StreamWriter that captures written messages.

Source from the content-addressed store, hash-verified

50
51
52class MockStreamWriter:
53 """Mock StreamWriter that captures written messages."""
54
55 def __init__(self):
56 self.messages = []
57 self._buffer = b""
58 self.bytes_written = 0
59
60 def write(self, data):
61 self._buffer += data
62 self.bytes_written += len(data)
63
64 async def drain(self):
65 while len(self._buffer) >= DirtyProtocol.HEADER_SIZE:
66 length = struct.unpack(
67 DirtyProtocol.HEADER_FORMAT,
68 self._buffer[:DirtyProtocol.HEADER_SIZE]
69 )[0]
70 total_size = DirtyProtocol.HEADER_SIZE + length
71 if len(self._buffer) >= total_size:
72 msg_data = self._buffer[DirtyProtocol.HEADER_SIZE:total_size]
73 self._buffer = self._buffer[total_size:]
74 self.messages.append(DirtyProtocol.decode(msg_data))
75 else:
76 break
77
78 def close(self):
79 pass
80
81 async def wait_closed(self):
82 pass
83
84
85class MockStreamReader:

Callers 6

mock_get_connectionFunction · 0.70
run_streamFunction · 0.70

Calls

no outgoing calls

Tested by

no test coverage detected