GetReplicationSetForOperation implements ReadRing.
(op Operation)
| 710 | |
| 711 | // GetReplicationSetForOperation implements ReadRing. |
| 712 | func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { |
| 713 | r.mtx.RLock() |
| 714 | defer r.mtx.RUnlock() |
| 715 | |
| 716 | if r.ringDesc == nil || len(r.ringTokens) == 0 { |
| 717 | return ReplicationSet{}, ErrEmptyRing |
| 718 | } |
| 719 | |
| 720 | // Build the initial replication set, excluding unhealthy instances. |
| 721 | healthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters)) |
| 722 | zoneFailures := make(map[string]struct{}) |
| 723 | now := time.Now() |
| 724 | |
| 725 | for _, instance := range r.ringDesc.Ingesters { |
| 726 | if r.IsHealthy(&instance, op, now) { |
| 727 | healthyInstances = append(healthyInstances, instance) |
| 728 | } else { |
| 729 | zoneFailures[instance.Zone] = struct{}{} |
| 730 | } |
| 731 | } |
| 732 | |
| 733 | // Max errors and max unavailable zones are mutually exclusive. We initialise both |
| 734 | // to 0 and then we update them whether zone-awareness is enabled or not. |
| 735 | maxErrors := 0 |
| 736 | maxUnavailableZones := 0 |
| 737 | |
| 738 | if r.cfg.ZoneAwarenessEnabled { |
| 739 | // Given data is replicated to RF different zones, we can tolerate a number of |
| 740 | // RF/2 failing zones. However, we need to protect from the case the ring currently |
| 741 | // contains instances in a number of zones < RF. |
| 742 | numReplicatedZones := min(len(r.ringZones), r.cfg.ReplicationFactor) |
| 743 | minSuccessZones := (numReplicatedZones / 2) + 1 |
| 744 | maxUnavailableZones = minSuccessZones - 1 |
| 745 | |
| 746 | if len(zoneFailures) > maxUnavailableZones { |
| 747 | return ReplicationSet{}, ErrTooManyUnhealthyInstances |
| 748 | } |
| 749 | |
| 750 | if len(zoneFailures) > 0 { |
| 751 | // We remove all instances (even healthy ones) from zones with at least |
| 752 | // 1 failing instance. Due to how replication works when zone-awareness is |
| 753 | // enabled (data is replicated to RF different zones), there's no benefit in |
| 754 | // querying healthy instances from "failing zones". A zone is considered |
| 755 | // failed if there is single error. |
| 756 | filteredInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters)) |
| 757 | for _, instance := range healthyInstances { |
| 758 | if _, ok := zoneFailures[instance.Zone]; !ok { |
| 759 | filteredInstances = append(filteredInstances, instance) |
| 760 | } |
| 761 | } |
| 762 | |
| 763 | healthyInstances = filteredInstances |
| 764 | } |
| 765 | |
| 766 | // Since we removed all instances from zones containing at least 1 failing |
| 767 | // instance, we have to decrease the max unavailable zones accordingly. |
| 768 | maxUnavailableZones -= len(zoneFailures) |
| 769 | } else { |