newRing creates a ring from the endpoints stored in the EndpointMap. The ring size is limited by the passed in max/min. ring entries will be created for each endpoint, and endpoints with high weight (specified by the endpoint) may have multiple entries. For example, for endpoints with weights {a:3
(endpoints *resolver.EndpointMap[*endpointState], minRingSize, maxRingSize uint64, logger *grpclog.PrefixLogger)
| 69 | // |
| 70 | // Must be called with a non-empty endpoints map. |
| 71 | func newRing(endpoints *resolver.EndpointMap[*endpointState], minRingSize, maxRingSize uint64, logger *grpclog.PrefixLogger) *ring { |
| 72 | if logger.V(2) { |
| 73 | logger.Infof("newRing: number of endpoints is %d, minRingSize is %d, maxRingSize is %d", endpoints.Len(), minRingSize, maxRingSize) |
| 74 | } |
| 75 | |
| 76 | // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 |
| 77 | normalizedWeights, minWeight := normalizeWeights(endpoints) |
| 78 | if logger.V(2) { |
| 79 | logger.Infof("newRing: normalized endpoint weights is %v", normalizedWeights) |
| 80 | } |
| 81 | |
| 82 | // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. |
| 83 | |
| 84 | // Scale up the size of the ring such that the least-weighted host gets a |
| 85 | // whole number of hashes on the ring. |
| 86 | // |
| 87 | // Note that size is limited by the input max/min. |
| 88 | scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize)) |
| 89 | ringSize := math.Ceil(scale) |
| 90 | items := make([]*ringEntry, 0, int(ringSize)) |
| 91 | if logger.V(2) { |
| 92 | logger.Infof("newRing: creating new ring of size %v", ringSize) |
| 93 | } |
| 94 | |
| 95 | // For each entry, scale*weight nodes are generated in the ring. |
| 96 | // |
| 97 | // Not all of these are whole numbers. E.g. for weights {a:3,b:3,c:4}, if |
| 98 | // ring size is 7, scale is 6.66. The numbers of nodes will be |
| 99 | // {a,a,b,b,c,c,c}. |
| 100 | // |
| 101 | // A hash is generated for each item, and later the results will be sorted |
| 102 | // based on the hash. |
| 103 | var currentHashes, targetHashes float64 |
| 104 | for _, epInfo := range normalizedWeights { |
| 105 | targetHashes += scale * epInfo.scaledWeight |
| 106 | // This index ensures that ring entries corresponding to the same |
| 107 | // endpoint hash to different values. And since this index is |
| 108 | // per-endpoint, these entries hash to the same value across address |
| 109 | // updates. |
| 110 | idx := 0 |
| 111 | for currentHashes < targetHashes { |
| 112 | h := xxhash.Sum64String(epInfo.hashKey + "_" + strconv.Itoa(idx)) |
| 113 | items = append(items, &ringEntry{hash: h, hashKey: epInfo.hashKey, weight: epInfo.originalWeight}) |
| 114 | idx++ |
| 115 | currentHashes++ |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | // Sort items based on hash, to prepare for binary search. |
| 120 | sort.Slice(items, func(i, j int) bool { return items[i].hash < items[j].hash }) |
| 121 | for i, ii := range items { |
| 122 | ii.idx = i |
| 123 | } |
| 124 | return &ring{items: items} |
| 125 | } |
| 126 | |
| 127 | // normalizeWeights calculates the normalized weights for each endpoint in the |
| 128 | // given endpoints map. It returns a slice of endpointWithState structs, where |