MCPcopy
hub / github.com/grafana/tempo / stopping

Method stopping

modules/livestore/live_store.go:446–511  ·  view source on GitHub ↗
(error)

Source from the content-addressed store, hash-verified

444}
445
446func (s *LiveStore) stopping(error) error {
447 level.Info(s.logger).Log("msg", "live store stopping", "remove_partition_owner", s.ingestPartitionLifecycler.RemoveOwnerOnShutdown())
448
449 // Reject new queries early in shutdown, before tearing down the reader.
450 s.readyErr.Store(&ErrStopping)
451 metricReady.Set(0)
452
453 var stopErr error
454
455 if s.cfg.ConsumeFromKafka {
456 // Stop the kafka lag background worker.
457 if s.lagCancel != nil {
458 s.lagCancel()
459 }
460 // Stop consuming.
461 if err := services.StopAndAwaitTerminated(context.Background(), s.reader); err != nil {
462 level.Warn(s.logger).Log("msg", "failed to stop reader", "err", err)
463 stopErr = errors.Join(stopErr, err)
464 }
465
466 // Reset lag metrics for our partition when stopping.
467 ingest.ResetLagMetricsForRevokedPartitions(s.cfg.IngestConfig.Kafka.ConsumerGroup, []int32{s.ingestPartitionID})
468 }
469
470 // Stop both the membership ring and partition ring even if an earlier shutdown step failed.
471 if err := services.StopAndAwaitTerminated(context.Background(), s.livestoreLifecycler); err != nil {
472 level.Warn(s.logger).Log("msg", "failed to stop livestore lifecycler", "err", err)
473 stopErr = errors.Join(stopErr, err)
474 }
475
476 if err := services.StopAndAwaitTerminated(context.Background(), s.ingestPartitionLifecycler); err != nil {
477 level.Warn(s.logger).Log("msg", "failed to stop partition lifecycler", "err", err)
478 stopErr = errors.Join(stopErr, err)
479 }
480
481 level.Info(s.logger).Log("msg", "stopping periodic WAL flush goroutines")
482 s.stopAllCutToWalLoops()
483 level.Info(s.logger).Log("msg", "periodic WAL flush goroutines stopped")
484
485 // Flush all data to disk.
486 level.Info(s.logger).Log("msg", "cutting all instances to WAL")
487 s.cutAllInstancesToWal()
488 level.Info(s.logger).Log("msg", "done cutting all instances to WAL")
489
490 // Remove the shutdown marker if it exists since we are shutting down.
491 shutdownMarkerPath := shutdownmarker.GetPath(s.cfg.ShutdownMarkerDir)
492 if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil {
493 level.Warn(s.logger).Log("msg", "failed to remove shutdown marker", "path", shutdownMarkerPath, "err", err)
494 stopErr = errors.Join(stopErr, err)
495 }
496
497 if s.cfg.holdAllBackgroundProcesses { // nothing to do
498 return stopErr
499 }
500
501 ticker := time.NewTicker(100 * time.Millisecond)
502 defer ticker.Stop()
503

Callers

nothing calls this directly

Calls 11

stopAllCutToWalLoopsMethod · 0.95
cutAllInstancesToWalMethod · 0.95
GetPathFunction · 0.92
RemoveFunction · 0.92
LogMethod · 0.65
StoreMethod · 0.65
SetMethod · 0.65
JoinMethod · 0.65
StopMethod · 0.65

Tested by

no test coverage detected