file() opens the parquet file and returns it. previously this method cached the file on first open but the memory cost of this was quite high. so instead we open it fresh every time. This also allows it to take the context for the caller.
(ctx context.Context)
| 220 | // but the memory cost of this was quite high. so instead we open it fresh every time. This |
| 221 | // also allows it to take the context for the caller. |
| 222 | func (w *walBlockFlush) file(ctx context.Context) (*pageFile, error) { |
| 223 | if err := ctx.Err(); err != nil { |
| 224 | return nil, err |
| 225 | } |
| 226 | |
| 227 | file, err := os.OpenFile(w.path, os.O_RDONLY, 0o600) |
| 228 | if err != nil { |
| 229 | return nil, fmt.Errorf("error opening file: %w", err) |
| 230 | } |
| 231 | info, err := file.Stat() |
| 232 | if err != nil { |
| 233 | return nil, fmt.Errorf("error getting file info: %w", err) |
| 234 | } |
| 235 | size := info.Size() |
| 236 | |
| 237 | wr := newWalReaderAt(ctx, file) |
| 238 | o := []parquet.FileOption{ |
| 239 | parquet.SkipBloomFilters(true), |
| 240 | parquet.SkipPageIndex(true), |
| 241 | parquet.FileSchema(parquetSchema), |
| 242 | parquet.FileReadMode(parquet.ReadModeAsync), |
| 243 | } |
| 244 | |
| 245 | pf, err := parquet.OpenFile(wr, size, o...) |
| 246 | if err != nil { |
| 247 | return nil, fmt.Errorf("error opening parquet file: %w", err) |
| 248 | } |
| 249 | |
| 250 | f := &pageFile{parquetFile: pf, osFile: file, r: wr} |
| 251 | |
| 252 | return f, nil |
| 253 | } |
| 254 | |
| 255 | func (w *walBlockFlush) rowIterator(ctx context.Context) (*rowIterator, error) { |
| 256 | file, err := w.file(ctx) |
no test coverage detected