| 84 | } |
| 85 | |
| 86 | func NewPartitionInstanceLifecycler(cfg PartitionInstanceLifecyclerConfig, ringName, ringKey string, store kv.Client, logger log.Logger, reg prometheus.Registerer) *PartitionInstanceLifecycler { |
| 87 | if cfg.PollingInterval == 0 { |
| 88 | cfg.PollingInterval = 5 * time.Second |
| 89 | } |
| 90 | |
| 91 | l := &PartitionInstanceLifecycler{ |
| 92 | cfg: cfg, |
| 93 | ringName: ringName, |
| 94 | ringKey: ringKey, |
| 95 | store: store, |
| 96 | logger: log.With(logger, "ring", ringName), |
| 97 | actorChan: make(chan func()), |
| 98 | createPartitionOnStartup: atomic.NewBool(true), |
| 99 | removeOwnerOnShutdown: atomic.NewBool(false), |
| 100 | reconcilesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ |
| 101 | Name: "partition_ring_lifecycler_reconciles_total", |
| 102 | Help: "Total number of reconciliations started.", |
| 103 | ConstLabels: map[string]string{"name": ringName}, |
| 104 | }, []string{"type"}), |
| 105 | reconcilesFailedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ |
| 106 | Name: "partition_ring_lifecycler_reconciles_failed_total", |
| 107 | Help: "Total number of reconciliations failed.", |
| 108 | ConstLabels: map[string]string{"name": ringName}, |
| 109 | }, []string{"type"}), |
| 110 | } |
| 111 | |
| 112 | l.BasicService = services.NewBasicService(l.starting, l.running, l.stopping) |
| 113 | |
| 114 | return l |
| 115 | } |
| 116 | |
| 117 | // CreatePartitionOnStartup returns whether the lifecycle creates the partition on startup |
| 118 | // if it doesn't exist. |