NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also trigger connecting to the existing memberlist cluster. If that fails and A
(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer)
| 456 | // trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned |
| 457 | // and service enters Failed state. |
| 458 | func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV { |
| 459 | cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace |
| 460 | |
| 461 | mlkv := &KV{ |
| 462 | cfg: cfg, |
| 463 | logger: logger, |
| 464 | registerer: registerer, |
| 465 | provider: dnsProvider, |
| 466 | store: make(map[string]ValueDesc), |
| 467 | codecs: make(map[string]codec.Codec), |
| 468 | watchers: make(map[string][]chan string), |
| 469 | keyNotifications: make(map[string]struct{}), |
| 470 | prefixWatchers: make(map[string][]chan string), |
| 471 | workersChannels: make(map[string]chan valueUpdate), |
| 472 | shutdown: make(chan struct{}), |
| 473 | maxCasRetries: maxCasRetries, |
| 474 | } |
| 475 | |
| 476 | mlkv.createAndRegisterMetrics() |
| 477 | |
| 478 | for _, c := range cfg.Codecs { |
| 479 | mlkv.codecs[c.CodecID()] = c |
| 480 | } |
| 481 | |
| 482 | // Register propagation delay tracker codec if enabled. |
| 483 | if cfg.PropagationDelayTracker.Enabled { |
| 484 | mlkv.codecs[PropagationDelayTrackerCodecID] = GetPropagationDelayTrackerCodec() |
| 485 | } |
| 486 | |
| 487 | mlkv.NamedService = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping).WithName("memberlist_kv") |
| 488 | |
| 489 | return mlkv |
| 490 | } |
| 491 | |
| 492 | func defaultMemberlistConfig() *memberlist.Config { |
| 493 | return memberlist.DefaultLANConfig() |