NewLifecycler creates new Lifecycler. It must be started via StartAsync.
(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer)
| 188 | |
| 189 | // NewLifecycler creates new Lifecycler. It must be started via StartAsync. |
| 190 | func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) { |
| 191 | addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger, cfg.EnableInet6) |
| 192 | if err != nil { |
| 193 | return nil, err |
| 194 | } |
| 195 | port := GetInstancePort(cfg.Port, cfg.ListenPort) |
| 196 | codec := GetCodec() |
| 197 | // Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler |
| 198 | store, err := kv.NewClient( |
| 199 | cfg.RingConfig.KVStore, |
| 200 | codec, |
| 201 | kv.RegistererWithKVName(reg, ringName+"-lifecycler"), |
| 202 | logger, |
| 203 | ) |
| 204 | if err != nil { |
| 205 | return nil, err |
| 206 | } |
| 207 | |
| 208 | // We do allow a nil FlushTransferer, but to keep the ring logic easier we assume |
| 209 | // it's always set, so we use a noop FlushTransferer |
| 210 | if flushTransferer == nil { |
| 211 | flushTransferer = NewNoopFlushTransferer() |
| 212 | } |
| 213 | |
| 214 | tokenGenerator := cfg.RingTokenGenerator |
| 215 | if tokenGenerator == nil { |
| 216 | tokenGenerator = NewRandomTokenGenerator() |
| 217 | } |
| 218 | |
| 219 | // We validate cfg before we create a Lifecycler. |
| 220 | err = cfg.Validate() |
| 221 | if err != nil { |
| 222 | return nil, err |
| 223 | } |
| 224 | |
| 225 | l := &Lifecycler{ |
| 226 | cfg: cfg, |
| 227 | flushTransferer: flushTransferer, |
| 228 | KVStore: store, |
| 229 | Addr: net.JoinHostPort(addr, strconv.Itoa(port)), |
| 230 | ID: cfg.ID, |
| 231 | RingName: ringName, |
| 232 | RingKey: ringKey, |
| 233 | flushOnShutdown: atomic.NewBool(flushOnShutdown), |
| 234 | unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), |
| 235 | clearTokensOnShutdown: atomic.NewBool(false), |
| 236 | Zone: cfg.Zone, |
| 237 | actorChan: make(chan func()), |
| 238 | state: PENDING, |
| 239 | tokenGenerator: tokenGenerator, |
| 240 | canJoinTimeout: 5 * time.Minute, |
| 241 | lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), |
| 242 | logger: logger, |
| 243 | } |
| 244 | |
| 245 | l.BasicService = services. |
| 246 | NewBasicService(nil, l.loop, l.stopping). |
| 247 | WithName(fmt.Sprintf("%s ring lifecycler", ringName)) |