(self, offset: int, length: int)
| 474 | pass |
| 475 | |
| 476 | def _process_columntext_subheader(self, offset: int, length: int) -> None: |
| 477 | offset += self._int_length |
| 478 | text_block_size = self._read_uint(offset, const.text_block_size_length) |
| 479 | |
| 480 | buf = self._read_bytes(offset, text_block_size) |
| 481 | cname_raw = buf[0:text_block_size].rstrip(b"\x00 ") |
| 482 | self.column_names_raw.append(cname_raw) |
| 483 | |
| 484 | if len(self.column_names_raw) == 1: |
| 485 | compression_literal = b"" |
| 486 | for cl in const.compression_literals: |
| 487 | if cl in cname_raw: |
| 488 | compression_literal = cl |
| 489 | self.compression = compression_literal |
| 490 | offset -= self._int_length |
| 491 | |
| 492 | offset1 = offset + 16 |
| 493 | if self.U64: |
| 494 | offset1 += 4 |
| 495 | |
| 496 | buf = self._read_bytes(offset1, self._lcp) |
| 497 | compression_literal = buf.rstrip(b"\x00") |
| 498 | if compression_literal == b"": |
| 499 | self._lcs = 0 |
| 500 | offset1 = offset + 32 |
| 501 | if self.U64: |
| 502 | offset1 += 4 |
| 503 | buf = self._read_bytes(offset1, self._lcp) |
| 504 | self.creator_proc = buf[0 : self._lcp] |
| 505 | elif compression_literal == const.rle_compression: |
| 506 | offset1 = offset + 40 |
| 507 | if self.U64: |
| 508 | offset1 += 4 |
| 509 | buf = self._read_bytes(offset1, self._lcp) |
| 510 | self.creator_proc = buf[0 : self._lcp] |
| 511 | elif self._lcs > 0: |
| 512 | self._lcp = 0 |
| 513 | offset1 = offset + 16 |
| 514 | if self.U64: |
| 515 | offset1 += 4 |
| 516 | buf = self._read_bytes(offset1, self._lcs) |
| 517 | self.creator_proc = buf[0 : self._lcp] |
| 518 | if hasattr(self, "creator_proc"): |
| 519 | self.creator_proc = self._convert_header_text(self.creator_proc) # pyright: ignore[reportArgumentType] |
| 520 | |
| 521 | def _process_columnname_subheader(self, offset: int, length: int) -> None: |
| 522 | int_len = self._int_length |
nothing calls this directly
no test coverage detected