Memory-efficient bytes buffer To return decoded data in read() and still follow the BufferedIOBase API, we need a buffer to always return the correct amount of bytes. This buffer should be filled using calls to put() Our maximum memory usage is determined by the sum of the size of
| 372 | |
| 373 | |
| 374 | class BytesQueueBuffer: |
| 375 | """Memory-efficient bytes buffer |
| 376 | |
| 377 | To return decoded data in read() and still follow the BufferedIOBase API, we need a |
| 378 | buffer to always return the correct amount of bytes. |
| 379 | |
| 380 | This buffer should be filled using calls to put() |
| 381 | |
| 382 | Our maximum memory usage is determined by the sum of the size of: |
| 383 | |
| 384 | * self.buffer, which contains the full data |
| 385 | * the largest chunk that we will copy in get() |
| 386 | """ |
| 387 | |
| 388 | def __init__(self) -> None: |
| 389 | self.buffer: typing.Deque[bytes | memoryview[bytes]] = collections.deque() |
| 390 | self._size: int = 0 |
| 391 | |
| 392 | def __len__(self) -> int: |
| 393 | return self._size |
| 394 | |
| 395 | def put(self, data: bytes) -> None: |
| 396 | self.buffer.append(data) |
| 397 | self._size += len(data) |
| 398 | |
| 399 | def get(self, n: int) -> bytes: |
| 400 | if n == 0: |
| 401 | return b"" |
| 402 | elif not self.buffer: |
| 403 | raise RuntimeError("buffer is empty") |
| 404 | elif n < 0: |
| 405 | raise ValueError("n should be > 0") |
| 406 | |
| 407 | if len(self.buffer[0]) == n and isinstance(self.buffer[0], bytes): |
| 408 | self._size -= n |
| 409 | return self.buffer.popleft() |
| 410 | |
| 411 | fetched = 0 |
| 412 | ret = io.BytesIO() |
| 413 | while fetched < n: |
| 414 | remaining = n - fetched |
| 415 | chunk = self.buffer.popleft() |
| 416 | chunk_length = len(chunk) |
| 417 | if remaining < chunk_length: |
| 418 | chunk = memoryview(chunk) |
| 419 | left_chunk, right_chunk = chunk[:remaining], chunk[remaining:] |
| 420 | ret.write(left_chunk) |
| 421 | self.buffer.appendleft(right_chunk) |
| 422 | self._size -= remaining |
| 423 | break |
| 424 | else: |
| 425 | ret.write(chunk) |
| 426 | self._size -= chunk_length |
| 427 | fetched += chunk_length |
| 428 | |
| 429 | if not self.buffer: |
| 430 | break |
| 431 |
no outgoing calls