(error)
| 444 | } |
| 445 | |
| 446 | func (s *LiveStore) stopping(error) error { |
| 447 | level.Info(s.logger).Log("msg", "live store stopping", "remove_partition_owner", s.ingestPartitionLifecycler.RemoveOwnerOnShutdown()) |
| 448 | |
| 449 | // Reject new queries early in shutdown, before tearing down the reader. |
| 450 | s.readyErr.Store(&ErrStopping) |
| 451 | metricReady.Set(0) |
| 452 | |
| 453 | var stopErr error |
| 454 | |
| 455 | if s.cfg.ConsumeFromKafka { |
| 456 | // Stop the kafka lag background worker. |
| 457 | if s.lagCancel != nil { |
| 458 | s.lagCancel() |
| 459 | } |
| 460 | // Stop consuming. |
| 461 | if err := services.StopAndAwaitTerminated(context.Background(), s.reader); err != nil { |
| 462 | level.Warn(s.logger).Log("msg", "failed to stop reader", "err", err) |
| 463 | stopErr = errors.Join(stopErr, err) |
| 464 | } |
| 465 | |
| 466 | // Reset lag metrics for our partition when stopping. |
| 467 | ingest.ResetLagMetricsForRevokedPartitions(s.cfg.IngestConfig.Kafka.ConsumerGroup, []int32{s.ingestPartitionID}) |
| 468 | } |
| 469 | |
| 470 | // Stop both the membership ring and partition ring even if an earlier shutdown step failed. |
| 471 | if err := services.StopAndAwaitTerminated(context.Background(), s.livestoreLifecycler); err != nil { |
| 472 | level.Warn(s.logger).Log("msg", "failed to stop livestore lifecycler", "err", err) |
| 473 | stopErr = errors.Join(stopErr, err) |
| 474 | } |
| 475 | |
| 476 | if err := services.StopAndAwaitTerminated(context.Background(), s.ingestPartitionLifecycler); err != nil { |
| 477 | level.Warn(s.logger).Log("msg", "failed to stop partition lifecycler", "err", err) |
| 478 | stopErr = errors.Join(stopErr, err) |
| 479 | } |
| 480 | |
| 481 | level.Info(s.logger).Log("msg", "stopping periodic WAL flush goroutines") |
| 482 | s.stopAllCutToWalLoops() |
| 483 | level.Info(s.logger).Log("msg", "periodic WAL flush goroutines stopped") |
| 484 | |
| 485 | // Flush all data to disk. |
| 486 | level.Info(s.logger).Log("msg", "cutting all instances to WAL") |
| 487 | s.cutAllInstancesToWal() |
| 488 | level.Info(s.logger).Log("msg", "done cutting all instances to WAL") |
| 489 | |
| 490 | // Remove the shutdown marker if it exists since we are shutting down. |
| 491 | shutdownMarkerPath := shutdownmarker.GetPath(s.cfg.ShutdownMarkerDir) |
| 492 | if err := shutdownmarker.Remove(shutdownMarkerPath); err != nil { |
| 493 | level.Warn(s.logger).Log("msg", "failed to remove shutdown marker", "path", shutdownMarkerPath, "err", err) |
| 494 | stopErr = errors.Join(stopErr, err) |
| 495 | } |
| 496 | |
| 497 | if s.cfg.holdAllBackgroundProcesses { // nothing to do |
| 498 | return stopErr |
| 499 | } |
| 500 | |
| 501 | ticker := time.NewTicker(100 * time.Millisecond) |
| 502 | defer ticker.Stop() |
| 503 |
nothing calls this directly
no test coverage detected