MCPcopy
hub / github.com/grafana/tempo / openWALBlock

Function openWALBlock

tempodb/encoding/vparquet3/wal_block.go:54–149  ·  view source on GitHub ↗

path + filename = folder to create path/folder/00001 /00002 00003 00004 folder = <blockID>+<tenantID>+vParquet openWALBlock opens an existing appendable block. It is read-only by not assigning a decoder. there's an interesting bug here that does not come into play due to the fact that we

(filename, path string, ingestionSlack, _ time.Duration)

Source from the content-addressed store, hash-verified

52// calling b.openWriter() it will attempt to create a new file as path/folder/00002 which will overwrite the first file. as long as we never
53// append to this file it should be ok.
54func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (common.WALBlock, error, error) {
55 dir := filepath.Join(path, filename)
56 _, _, version, err := parseName(filename)
57 if err != nil {
58 return nil, nil, err
59 }
60
61 if version != VersionString {
62 return nil, nil, fmt.Errorf("mismatched version in vparquet wal: %s, %s, %s", version, path, filename)
63 }
64
65 metaPath := filepath.Join(dir, backend.MetaName)
66 metaBytes, err := os.ReadFile(metaPath)
67 if err != nil {
68 return nil, nil, fmt.Errorf("error reading wal meta json: %s: %w", metaPath, err)
69 }
70
71 meta := &backend.BlockMeta{}
72 err = json.Unmarshal(metaBytes, meta)
73 if err != nil {
74 return nil, nil, fmt.Errorf("error unmarshaling wal meta json: %s: %w", metaPath, err)
75 }
76
77 // below we're going to iterate all of the parquet files in the wal and build the meta, this will correctly
78 // recount total objects so clear them out here
79 meta.TotalObjects = 0
80
81 b := &walBlock{
82 meta: meta,
83 path: path,
84 ids: common.NewIDMap[int64](0),
85 ingestionSlack: ingestionSlack,
86 }
87
88 // read all files in dir
89 files, err := os.ReadDir(dir)
90 if err != nil {
91 return nil, nil, fmt.Errorf("error reading dir: %w", err)
92 }
93
94 var warning error
95 for _, f := range files {
96 if f.Name() == backend.MetaName {
97 continue
98 }
99
100 // Ignore 0-byte files which are pages that were
101 // opened but not flushed.
102 i, err := f.Info()
103 if err != nil {
104 return nil, nil, fmt.Errorf("error getting file info: %s: %w", f.Name(), err)
105 }
106 if i.Size() == 0 {
107 continue
108 }
109
110 path := filepath.Join(dir, f.Name())
111 page := newWalBlockFlush(path, common.NewIDMap[int64](0))

Callers 4

OpenWALBlockMethod · 0.70
TestPartialReplayFunction · 0.70
BenchmarkWalTraceQLFunction · 0.70

Calls 13

ObjectAddedMethod · 0.80
parseNameFunction · 0.70
newWalBlockFlushFunction · 0.70
makeIterFuncFunction · 0.70
JoinMethod · 0.65
UnmarshalMethod · 0.65
NameMethod · 0.65
SizeMethod · 0.65
CloseMethod · 0.65
NextMethod · 0.65
SetMethod · 0.65
fileMethod · 0.45

Tested by 3

TestPartialReplayFunction · 0.56
BenchmarkWalTraceQLFunction · 0.56