path + filename = folder to create path/folder/00001 /00002 00003 00004 folder = <blockID>+<tenantID>+vParquet openWALBlock opens an existing appendable block. It is read-only by not assigning a decoder. there's an interesting bug here that does not come into play due to the fact that we
(filename, path string, ingestionSlack, _ time.Duration)
| 52 | // calling b.openWriter() it will attempt to create a new file as path/folder/00002 which will overwrite the first file. as long as we never |
| 53 | // append to this file it should be ok. |
| 54 | func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (common.WALBlock, error, error) { |
| 55 | dir := filepath.Join(path, filename) |
| 56 | _, _, version, err := parseName(filename) |
| 57 | if err != nil { |
| 58 | return nil, nil, err |
| 59 | } |
| 60 | |
| 61 | if version != VersionString { |
| 62 | return nil, nil, fmt.Errorf("mismatched version in vparquet wal: %s, %s, %s", version, path, filename) |
| 63 | } |
| 64 | |
| 65 | metaPath := filepath.Join(dir, backend.MetaName) |
| 66 | metaBytes, err := os.ReadFile(metaPath) |
| 67 | if err != nil { |
| 68 | return nil, nil, fmt.Errorf("error reading wal meta json: %s: %w", metaPath, err) |
| 69 | } |
| 70 | |
| 71 | meta := &backend.BlockMeta{} |
| 72 | err = json.Unmarshal(metaBytes, meta) |
| 73 | if err != nil { |
| 74 | return nil, nil, fmt.Errorf("error unmarshaling wal meta json: %s: %w", metaPath, err) |
| 75 | } |
| 76 | |
| 77 | // below we're going to iterate all of the parquet files in the wal and build the meta, this will correctly |
| 78 | // recount total objects so clear them out here |
| 79 | meta.TotalObjects = 0 |
| 80 | |
| 81 | b := &walBlock{ |
| 82 | meta: meta, |
| 83 | path: path, |
| 84 | ids: common.NewIDMap[int64](0), |
| 85 | ingestionSlack: ingestionSlack, |
| 86 | } |
| 87 | |
| 88 | // read all files in dir |
| 89 | files, err := os.ReadDir(dir) |
| 90 | if err != nil { |
| 91 | return nil, nil, fmt.Errorf("error reading dir: %w", err) |
| 92 | } |
| 93 | |
| 94 | var warning error |
| 95 | for _, f := range files { |
| 96 | if f.Name() == backend.MetaName { |
| 97 | continue |
| 98 | } |
| 99 | |
| 100 | // Ignore 0-byte files which are pages that were |
| 101 | // opened but not flushed. |
| 102 | i, err := f.Info() |
| 103 | if err != nil { |
| 104 | return nil, nil, fmt.Errorf("error getting file info: %s: %w", f.Name(), err) |
| 105 | } |
| 106 | if i.Size() == 0 { |
| 107 | continue |
| 108 | } |
| 109 | |
| 110 | path := filepath.Join(dir, f.Name()) |
| 111 | page := newWalBlockFlush(path, common.NewIDMap[int64](0)) |