| 177 | } |
| 178 | |
| 179 | func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) { |
| 180 | ret := EndpointsUpdate{} |
| 181 | for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { |
| 182 | ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) |
| 183 | } |
| 184 | priorities := make(map[uint32]map[string]bool) |
| 185 | sumOfWeights := make(map[uint32]uint64) |
| 186 | uniqueEndpointAddrs := make(map[string]bool) |
| 187 | for _, locality := range m.Endpoints { |
| 188 | l := locality.GetLocality() |
| 189 | if l == nil { |
| 190 | return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) |
| 191 | } |
| 192 | weight := locality.GetLoadBalancingWeight().GetValue() |
| 193 | if weight == 0 { |
| 194 | logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l)) |
| 195 | continue |
| 196 | } |
| 197 | priority := locality.GetPriority() |
| 198 | sumOfWeights[priority] += uint64(weight) |
| 199 | if sumOfWeights[priority] > math.MaxUint32 { |
| 200 | return EndpointsUpdate{}, fmt.Errorf("sum of weights of localities at the same priority %d exceeded maximal value", priority) |
| 201 | } |
| 202 | localitiesWithPriority := priorities[priority] |
| 203 | if localitiesWithPriority == nil { |
| 204 | localitiesWithPriority = make(map[string]bool) |
| 205 | priorities[priority] = localitiesWithPriority |
| 206 | } |
| 207 | lid := clients.Locality{ |
| 208 | Region: l.Region, |
| 209 | Zone: l.Zone, |
| 210 | SubZone: l.SubZone, |
| 211 | } |
| 212 | lidStr := xdsinternal.LocalityString(lid) |
| 213 | |
| 214 | // "Since an xDS configuration can place a given locality under multiple |
| 215 | // priorities, it is possible to see locality weight attributes with |
| 216 | // different values for the same locality." - A52 |
| 217 | // |
| 218 | // This is handled in the client by emitting the locality weight |
| 219 | // specified for the priority it is specified in. If the same locality |
| 220 | // has a different weight in two priorities, each priority will specify |
| 221 | // a locality with the locality weight specified for that priority, and |
| 222 | // thus the subsequent tree of balancers linked to that priority will |
| 223 | // use that locality weight as well. |
| 224 | if localitiesWithPriority[lidStr] { |
| 225 | return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority) |
| 226 | } |
| 227 | localitiesWithPriority[lidStr] = true |
| 228 | endpoints, err := parseEndpoints(locality.GetLbEndpoints(), uniqueEndpointAddrs) |
| 229 | if err != nil { |
| 230 | return EndpointsUpdate{}, err |
| 231 | } |
| 232 | var localityMetadata map[string]any |
| 233 | if envconfig.XDSHTTPConnectEnabled { |
| 234 | var err error |
| 235 | localityMetadata, err = validateAndConstructMetadata(locality.GetMetadata()) |
| 236 | if err != nil { |