joinMembersInBatches joins the given members and re-resolves their addresses after joining 100 nodes. joinMembersInBatches returns the number of nodes joined. joinMembersInBatches returns an error only when the number of joined nodes is 0.
(ctx context.Context, members []string)
| 848 | // joinMembersInBatches returns the number of nodes joined. joinMembersInBatches returns an error only when the |
| 849 | // number of joined nodes is 0. |
| 850 | func (m *KV) joinMembersInBatches(ctx context.Context, members []string) (int, error) { |
| 851 | const batchSize = 100 |
| 852 | var ( |
| 853 | attemptedNodes = make(map[string]bool) |
| 854 | successfullyJoined = 0 |
| 855 | lastErr error |
| 856 | batch = make([]string, batchSize) |
| 857 | nodes []string |
| 858 | ) |
| 859 | for moreAvailableNodes := true; ctx.Err() == nil && moreAvailableNodes; { |
| 860 | // Rediscover nodes and try to join a subset of them with each batch. |
| 861 | // When the list of nodes is large by the time we reach the end of the list some of the |
| 862 | // IPs can be unreachable. |
| 863 | // |
| 864 | // Ignores any DNS resolution error because it's not really actionable in this |
| 865 | // context. |
| 866 | newlyResolved, _ := m.discoverMembersWithRetries(ctx, members) |
| 867 | if len(newlyResolved) > 0 { |
| 868 | // If the resolution fails we keep using the nodes list from the last resolution. |
| 869 | // If that failed too, then we fail the join attempt. |
| 870 | nodes = newlyResolved |
| 871 | } |
| 872 | |
| 873 | // Prepare batch |
| 874 | batch = batch[:0] |
| 875 | moreAvailableNodes = false |
| 876 | for _, n := range nodes { |
| 877 | if attemptedNodes[n] { |
| 878 | continue |
| 879 | } |
| 880 | if len(batch) >= batchSize { |
| 881 | moreAvailableNodes = true |
| 882 | break |
| 883 | } |
| 884 | batch = append(batch, n) |
| 885 | attemptedNodes[n] = true |
| 886 | } |
| 887 | |
| 888 | // Join batch |
| 889 | joinedInBatch, err := m.joinMembersBatch(ctx, batch) |
| 890 | if err != nil { |
| 891 | lastErr = err |
| 892 | } |
| 893 | successfullyJoined += joinedInBatch |
| 894 | } |
| 895 | if successfullyJoined > 0 { |
| 896 | return successfullyJoined, nil |
| 897 | } |
| 898 | if successfullyJoined == 0 && lastErr == nil { |
| 899 | return 0, errors.New("found no nodes to join") |
| 900 | } |
| 901 | return 0, lastErr |
| 902 | } |
| 903 | |
| 904 | // joinMembersBatch returns an error only if it couldn't successfully join any nodes or if ctx is cancelled. |
| 905 | func (m *KV) joinMembersBatch(ctx context.Context, nodes []string) (successfullyJoined int, lastErr error) { |
no test coverage detected