()
| 303 | } |
| 304 | |
| 305 | func (s *LiveStore) reloadBlocks() error { |
| 306 | // Reclaim tombstoned blocks left by an unclean shutdown before |
| 307 | // reloading, so they don't get scanned as live. |
| 308 | if n, err := s.wal.ClearTombstonedBlocks(); err != nil { |
| 309 | level.Warn(s.logger).Log("msg", "failed to clear tombstoned wal blocks at startup", "err", err) |
| 310 | } else if n > 0 { |
| 311 | level.Info(s.logger).Log("msg", "cleared tombstoned wal blocks at startup", "count", n) |
| 312 | } |
| 313 | if n, err := s.wal.LocalBackend().ClearTombstonedBlocks(); err != nil { |
| 314 | level.Warn(s.logger).Log("msg", "failed to clear tombstoned complete blocks at startup", "err", err) |
| 315 | } else if n > 0 { |
| 316 | level.Info(s.logger).Log("msg", "cleared tombstoned complete blocks at startup", "count", n) |
| 317 | } |
| 318 | |
| 319 | // ------------------------------------ |
| 320 | // wal blocks |
| 321 | // ------------------------------------ |
| 322 | level.Info(s.logger).Log("msg", "reloading wal blocks") |
| 323 | walBlocks, err := s.wal.RescanBlocks(0, s.logger) |
| 324 | if err != nil { |
| 325 | return fmt.Errorf("failed to rescan wal blocks: %w", err) |
| 326 | } |
| 327 | |
| 328 | for _, blk := range walBlocks { |
| 329 | err := func() error { |
| 330 | meta := blk.BlockMeta() |
| 331 | |
| 332 | inst, err := s.getOrCreateInstance(meta.TenantID) |
| 333 | if err != nil { |
| 334 | return fmt.Errorf("failed to get or create instance for tenant %s: %w", meta.TenantID, err) |
| 335 | } |
| 336 | |
| 337 | inst.blocksMtx.Lock() |
| 338 | defer inst.blocksMtx.Unlock() |
| 339 | |
| 340 | level.Info(s.logger).Log("msg", "reloaded wal block", "block", meta.BlockID.String()) |
| 341 | inst.blocks.Store(inst.blocks.Load().withWALBlockAdded((uuid.UUID)(meta.BlockID), blk)) |
| 342 | |
| 343 | level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String(), "size", blk.DataLength()) |
| 344 | if err := s.enqueueCompleteOp(meta.TenantID, uuid.UUID(meta.BlockID), true); err != nil { |
| 345 | return fmt.Errorf("failed to enqueue wal block for completion for tenant %s: %w", meta.TenantID, err) |
| 346 | } |
| 347 | |
| 348 | level.Info(s.logger).Log("msg", "reloaded wal blocks", "tenant", inst.tenantID, "count", len(inst.blocks.Load().walBlocks)) |
| 349 | |
| 350 | return nil |
| 351 | }() |
| 352 | if err != nil { |
| 353 | return err |
| 354 | } |
| 355 | } |
| 356 | |
| 357 | level.Info(s.logger).Log("msg", "wal blocks to complete at startup", "count", len(walBlocks)) |
| 358 | |
| 359 | // ------------------------------------ |
| 360 | // Complete blocks |
| 361 | // ------------------------------------ |
| 362 | level.Info(s.logger).Log("msg", "reloading completed blocks") |
no test coverage detected