MCPcopy
hub / github.com/grafana/dskit / NewLifecycler

Function NewLifecycler

ring/lifecycler.go:190–250  ·  view source on GitHub ↗

NewLifecycler creates new Lifecycler. It must be started via StartAsync.

(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer)

Source from the content-addressed store, hash-verified

188
189// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
190func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) {
191 addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger, cfg.EnableInet6)
192 if err != nil {
193 return nil, err
194 }
195 port := GetInstancePort(cfg.Port, cfg.ListenPort)
196 codec := GetCodec()
197 // Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler
198 store, err := kv.NewClient(
199 cfg.RingConfig.KVStore,
200 codec,
201 kv.RegistererWithKVName(reg, ringName+"-lifecycler"),
202 logger,
203 )
204 if err != nil {
205 return nil, err
206 }
207
208 // We do allow a nil FlushTransferer, but to keep the ring logic easier we assume
209 // it's always set, so we use a noop FlushTransferer
210 if flushTransferer == nil {
211 flushTransferer = NewNoopFlushTransferer()
212 }
213
214 tokenGenerator := cfg.RingTokenGenerator
215 if tokenGenerator == nil {
216 tokenGenerator = NewRandomTokenGenerator()
217 }
218
219 // We validate cfg before we create a Lifecycler.
220 err = cfg.Validate()
221 if err != nil {
222 return nil, err
223 }
224
225 l := &Lifecycler{
226 cfg: cfg,
227 flushTransferer: flushTransferer,
228 KVStore: store,
229 Addr: net.JoinHostPort(addr, strconv.Itoa(port)),
230 ID: cfg.ID,
231 RingName: ringName,
232 RingKey: ringKey,
233 flushOnShutdown: atomic.NewBool(flushOnShutdown),
234 unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
235 clearTokensOnShutdown: atomic.NewBool(false),
236 Zone: cfg.Zone,
237 actorChan: make(chan func()),
238 state: PENDING,
239 tokenGenerator: tokenGenerator,
240 canJoinTimeout: 5 * time.Minute,
241 lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
242 logger: logger,
243 }
244
245 l.BasicService = services.
246 NewBasicService(nil, l.loop, l.stopping).
247 WithName(fmt.Sprintf("%s ring lifecycler", ringName))

Calls 11

NewClientFunction · 0.92
RegistererWithKVNameFunction · 0.92
NewBasicServiceFunction · 0.92
GetInstanceAddrFunction · 0.85
GetInstancePortFunction · 0.85
GetCodecFunction · 0.85
NewNoopFlushTransfererFunction · 0.85
NewRandomTokenGeneratorFunction · 0.85
NewLifecyclerMetricsFunction · 0.85
WithNameMethod · 0.80
ValidateMethod · 0.45