MCPcopy
hub / github.com/benoitc/gunicorn / drain

Method drain

tests/dirty/test_streaming_integration.py:71–89  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

69 self._buffer += data
70
71 async def drain(self):
72 # Decode the buffer to extract messages using binary protocol
73 while len(self._buffer) >= HEADER_SIZE:
74 # Decode header to get payload length
75 _, _, length = BinaryProtocol.decode_header(
76 self._buffer[:HEADER_SIZE]
77 )
78 total_size = HEADER_SIZE + length
79 if len(self._buffer) >= total_size:
80 msg_data = self._buffer[:total_size]
81 self._buffer = self._buffer[total_size:]
82 # decode_message returns (msg_type_str, request_id, payload_dict)
83 msg_type_str, request_id, payload_dict = BinaryProtocol.decode_message(msg_data)
84 # Reconstruct the dict format for backwards compatibility
85 result = {"type": msg_type_str, "id": request_id}
86 result.update(payload_dict)
87 self.messages.append(result)
88 else:
89 break
90
91 def close(self):
92 self.closed = True

Callers

nothing calls this directly

Calls 2

decode_headerMethod · 0.80
decode_messageMethod · 0.45

Tested by

no test coverage detected