MCPcopy
hub / github.com/encode/starlette / GZipResponder

Class GZipResponder

starlette/middleware/gzip.py:119–141  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

117
118
119class GZipResponder(IdentityResponder):
120 content_encoding = "gzip"
121
122 def __init__(self, app: ASGIApp, minimum_size: int, compresslevel: int = 9) -> None:
123 super().__init__(app, minimum_size)
124
125 self.gzip_buffer = io.BytesIO()
126 self.gzip_file = gzip.GzipFile(mode="wb", fileobj=self.gzip_buffer, compresslevel=compresslevel)
127
128 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
129 with self.gzip_buffer, self.gzip_file:
130 await super().__call__(scope, receive, send)
131
132 def apply_compression(self, body: bytes, *, more_body: bool) -> bytes:
133 self.gzip_file.write(body)
134 if not more_body:
135 self.gzip_file.close()
136
137 body = self.gzip_buffer.getvalue()
138 self.gzip_buffer.seek(0)
139 self.gzip_buffer.truncate()
140
141 return body
142
143
144async def unattached_send(message: Message) -> NoReturn:

Callers 1

__call__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected