fastJoinMembersOnStartup attempts to reach small subset of nodes (computed as RetransmitMult * log10(number of discovered members + 1)).
(ctx context.Context)
| 748 | |
| 749 | // fastJoinMembersOnStartup attempts to reach small subset of nodes (computed as RetransmitMult * log10(number of discovered members + 1)). |
| 750 | func (m *KV) fastJoinMembersOnStartup(ctx context.Context) error { |
| 751 | startTime := time.Now() |
| 752 | |
| 753 | nodes, err := m.discoverMembersWithRetries(ctx, m.cfg.JoinMembers) |
| 754 | if err != nil && len(nodes) == 0 { |
| 755 | return err |
| 756 | } |
| 757 | |
| 758 | // Shuffle the node addresses to randomize the ones picked for the fast join. |
| 759 | math_rand.Shuffle(len(nodes), func(i, j int) { |
| 760 | nodes[i], nodes[j] = nodes[j], nodes[i] |
| 761 | }) |
| 762 | |
| 763 | // This is the same formula as used by memberlist for number of nodes that a single message should be gossiped to. |
| 764 | toJoin := m.cfg.RetransmitMult * int(math.Ceil(math.Log10(float64(len(nodes)+1)))) |
| 765 | |
| 766 | level.Info(m.logger).Log("msg", "memberlist fast-join starting", "nodes_found", len(nodes), "to_join", toJoin) |
| 767 | |
| 768 | totalJoined := 0 |
| 769 | for toJoin > 0 && len(nodes) > 0 && ctx.Err() == nil { |
| 770 | reached, err := m.memberlist.Join(nodes[0:1]) // Try to join single node only. |
| 771 | if err != nil { |
| 772 | level.Info(m.logger).Log("msg", "fast-joining node failed", "node", nodes[0], "err", err) |
| 773 | } |
| 774 | |
| 775 | totalJoined += reached |
| 776 | toJoin -= reached |
| 777 | |
| 778 | nodes = nodes[1:] |
| 779 | } |
| 780 | |
| 781 | if totalJoined < m.cfg.AbortIfFastJoinFailsMinNodes { |
| 782 | level.Warn(m.logger).Log("msg", "memberlist fast-join failed to reach minimum required seed nodes", "joined_nodes", totalJoined, "required_nodes", m.cfg.AbortIfFastJoinFailsMinNodes, "elapsed_time", time.Since(startTime)) |
| 783 | return fmt.Errorf("fast-join failed to reach minimum required seed nodes: joined %d, required %d", totalJoined, m.cfg.AbortIfFastJoinFailsMinNodes) |
| 784 | } |
| 785 | |
| 786 | level.Info(m.logger).Log("msg", "memberlist fast-join finished", "joined_nodes", totalJoined, "elapsed_time", time.Since(startTime)) |
| 787 | return nil |
| 788 | } |
| 789 | |
| 790 | // The joinMembersOnStartup method resolves the addresses of the given join_members hosts and asks memberlist to join to them. |
| 791 | // This method cannot be called before KV.running state as it may wait for K8S DNS to resolve the service addresses of members |
no test coverage detected