(self, content: bytes)
| 235 | self._chunk_size = chunk_size |
| 236 | |
| 237 | def decode(self, content: bytes) -> list[bytes]: |
| 238 | if self._chunk_size is None: |
| 239 | return [content] if content else [] |
| 240 | |
| 241 | self._buffer.write(content) |
| 242 | if self._buffer.tell() >= self._chunk_size: |
| 243 | value = self._buffer.getvalue() |
| 244 | chunks = [ |
| 245 | value[i : i + self._chunk_size] |
| 246 | for i in range(0, len(value), self._chunk_size) |
| 247 | ] |
| 248 | if len(chunks[-1]) == self._chunk_size: |
| 249 | self._buffer.seek(0) |
| 250 | self._buffer.truncate() |
| 251 | return chunks |
| 252 | else: |
| 253 | self._buffer.seek(0) |
| 254 | self._buffer.write(chunks[-1]) |
| 255 | self._buffer.truncate() |
| 256 | return chunks[:-1] |
| 257 | else: |
| 258 | return [] |
| 259 | |
| 260 | def flush(self) -> list[bytes]: |
| 261 | value = self._buffer.getvalue() |
no outgoing calls
no test coverage detected