MCPcopy
hub / github.com/grafana/tempo / reloadBlocks

Method reloadBlocks

modules/livestore/live_store_background.go:305–448  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

303}
304
305func (s *LiveStore) reloadBlocks() error {
306 // Reclaim tombstoned blocks left by an unclean shutdown before
307 // reloading, so they don't get scanned as live.
308 if n, err := s.wal.ClearTombstonedBlocks(); err != nil {
309 level.Warn(s.logger).Log("msg", "failed to clear tombstoned wal blocks at startup", "err", err)
310 } else if n > 0 {
311 level.Info(s.logger).Log("msg", "cleared tombstoned wal blocks at startup", "count", n)
312 }
313 if n, err := s.wal.LocalBackend().ClearTombstonedBlocks(); err != nil {
314 level.Warn(s.logger).Log("msg", "failed to clear tombstoned complete blocks at startup", "err", err)
315 } else if n > 0 {
316 level.Info(s.logger).Log("msg", "cleared tombstoned complete blocks at startup", "count", n)
317 }
318
319 // ------------------------------------
320 // wal blocks
321 // ------------------------------------
322 level.Info(s.logger).Log("msg", "reloading wal blocks")
323 walBlocks, err := s.wal.RescanBlocks(0, s.logger)
324 if err != nil {
325 return fmt.Errorf("failed to rescan wal blocks: %w", err)
326 }
327
328 for _, blk := range walBlocks {
329 err := func() error {
330 meta := blk.BlockMeta()
331
332 inst, err := s.getOrCreateInstance(meta.TenantID)
333 if err != nil {
334 return fmt.Errorf("failed to get or create instance for tenant %s: %w", meta.TenantID, err)
335 }
336
337 inst.blocksMtx.Lock()
338 defer inst.blocksMtx.Unlock()
339
340 level.Info(s.logger).Log("msg", "reloaded wal block", "block", meta.BlockID.String())
341 inst.blocks.Store(inst.blocks.Load().withWALBlockAdded((uuid.UUID)(meta.BlockID), blk))
342
343 level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String(), "size", blk.DataLength())
344 if err := s.enqueueCompleteOp(meta.TenantID, uuid.UUID(meta.BlockID), true); err != nil {
345 return fmt.Errorf("failed to enqueue wal block for completion for tenant %s: %w", meta.TenantID, err)
346 }
347
348 level.Info(s.logger).Log("msg", "reloaded wal blocks", "tenant", inst.tenantID, "count", len(inst.blocks.Load().walBlocks))
349
350 return nil
351 }()
352 if err != nil {
353 return err
354 }
355 }
356
357 level.Info(s.logger).Log("msg", "wal blocks to complete at startup", "count", len(walBlocks))
358
359 // ------------------------------------
360 // Complete blocks
361 // ------------------------------------
362 level.Info(s.logger).Log("msg", "reloading completed blocks")

Callers 1

startingMethod · 0.95

Calls 15

getOrCreateInstanceMethod · 0.95
enqueueCompleteOpMethod · 0.95
NewReaderFunction · 0.92
OpenBlockFunction · 0.92
NewLocalBlockFunction · 0.85
LocalBackendMethod · 0.80
RescanBlocksMethod · 0.80
withWALBlockAddedMethod · 0.80
LogMethod · 0.65
BlockMetaMethod · 0.65
StoreMethod · 0.65

Tested by

no test coverage detected