| 494 | } |
| 495 | |
| 496 | func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { |
| 497 | tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer) |
| 498 | if err != nil { |
| 499 | return nil, fmt.Errorf("failed to create transport: %v", err) |
| 500 | } |
| 501 | |
| 502 | mlCfg := defaultMemberlistConfig() |
| 503 | mlCfg.Delegate = m |
| 504 | |
| 505 | mlCfg.TCPTimeout = m.cfg.StreamTimeout |
| 506 | mlCfg.RetransmitMult = m.cfg.RetransmitMult |
| 507 | mlCfg.PushPullInterval = m.cfg.PushPullInterval |
| 508 | mlCfg.GossipInterval = m.cfg.GossipInterval |
| 509 | mlCfg.GossipNodes = m.cfg.GossipNodes |
| 510 | mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime |
| 511 | mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime |
| 512 | mlCfg.EnableCompression = m.cfg.EnableCompression |
| 513 | mlCfg.CompressionAlgorithm = memberlist.CompressionAlgorithm(m.cfg.CompressionAlgorithm) |
| 514 | mlCfg.HandoffQueueDepth = m.cfg.ReceivedMessagesQueueSize |
| 515 | |
| 516 | mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr |
| 517 | mlCfg.AdvertisePort = m.cfg.AdvertisePort |
| 518 | |
| 519 | mlCfg.Label = m.cfg.ClusterLabel |
| 520 | mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled |
| 521 | |
| 522 | if m.cfg.NodeName != "" { |
| 523 | mlCfg.Name = m.cfg.NodeName |
| 524 | } |
| 525 | if m.cfg.RandomizeNodeName { |
| 526 | mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix(m.logger) |
| 527 | } |
| 528 | |
| 529 | mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false) |
| 530 | mlCfg.Transport = tr |
| 531 | |
| 532 | // Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet". |
| 533 | // As we don't use UDP for sending packets, we can use higher value here. |
| 534 | mlCfg.UDPBufferSize = 10 * 1024 * 1024 |
| 535 | |
| 536 | // For our use cases, we don't need a very fast detection of dead nodes. Since we use a TCP transport |
| 537 | // and we open a new TCP connection for each packet, we prefer to reduce the probe frequency and increase |
| 538 | // the timeout compared to defaults. |
| 539 | if m.cfg.probeInterval > 0 { |
| 540 | mlCfg.ProbeInterval = m.cfg.probeInterval |
| 541 | mlCfg.ProbeTimeout = m.cfg.probeInterval / 2 |
| 542 | } else { |
| 543 | mlCfg.ProbeInterval = 5 * time.Second // Probe a random node every this interval. This setting is also the total timeout for the direct + indirect probes. |
| 544 | mlCfg.ProbeTimeout = 2 * time.Second // Timeout for the direct probe. |
| 545 | } |
| 546 | |
| 547 | // Since we use a custom transport based on TCP, having TCP-based fallbacks doesn't give us any benefit. |
| 548 | // On the contrary, if we keep TCP pings enabled, each node will effectively run 2x pings against a dead |
| 549 | // node, because the TCP-based fallback will always trigger. |
| 550 | mlCfg.DisableTcpPings = true |
| 551 | |
| 552 | // Configure zone-aware routing if enabled. |
| 553 | if m.cfg.ZoneAwareRouting.Enabled { |