MCPcopy
hub / github.com/grafana/dskit / GetReplicationSetForOperation

Method GetReplicationSetForOperation

ring/ring.go:712–792  ·  view source on GitHub ↗

GetReplicationSetForOperation implements ReadRing.

(op Operation)

Source from the content-addressed store, hash-verified

710
711// GetReplicationSetForOperation implements ReadRing.
712func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
713 r.mtx.RLock()
714 defer r.mtx.RUnlock()
715
716 if r.ringDesc == nil || len(r.ringTokens) == 0 {
717 return ReplicationSet{}, ErrEmptyRing
718 }
719
720 // Build the initial replication set, excluding unhealthy instances.
721 healthyInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
722 zoneFailures := make(map[string]struct{})
723 now := time.Now()
724
725 for _, instance := range r.ringDesc.Ingesters {
726 if r.IsHealthy(&instance, op, now) {
727 healthyInstances = append(healthyInstances, instance)
728 } else {
729 zoneFailures[instance.Zone] = struct{}{}
730 }
731 }
732
733 // Max errors and max unavailable zones are mutually exclusive. We initialise both
734 // to 0 and then we update them whether zone-awareness is enabled or not.
735 maxErrors := 0
736 maxUnavailableZones := 0
737
738 if r.cfg.ZoneAwarenessEnabled {
739 // Given data is replicated to RF different zones, we can tolerate a number of
740 // RF/2 failing zones. However, we need to protect from the case the ring currently
741 // contains instances in a number of zones < RF.
742 numReplicatedZones := min(len(r.ringZones), r.cfg.ReplicationFactor)
743 minSuccessZones := (numReplicatedZones / 2) + 1
744 maxUnavailableZones = minSuccessZones - 1
745
746 if len(zoneFailures) > maxUnavailableZones {
747 return ReplicationSet{}, ErrTooManyUnhealthyInstances
748 }
749
750 if len(zoneFailures) > 0 {
751 // We remove all instances (even healthy ones) from zones with at least
752 // 1 failing instance. Due to how replication works when zone-awareness is
753 // enabled (data is replicated to RF different zones), there's no benefit in
754 // querying healthy instances from "failing zones". A zone is considered
755 // failed if there is single error.
756 filteredInstances := make([]InstanceDesc, 0, len(r.ringDesc.Ingesters))
757 for _, instance := range healthyInstances {
758 if _, ok := zoneFailures[instance.Zone]; !ok {
759 filteredInstances = append(filteredInstances, instance)
760 }
761 }
762
763 healthyInstances = filteredInstances
764 }
765
766 // Since we removed all instances from zones containing at least 1 failing
767 // instance, we have to decrease the max unavailable zones accordingly.
768 maxUnavailableZones -= len(zoneFailures)
769 } else {

Callers

nothing calls this directly

Calls 1

IsHealthyMethod · 0.95

Tested by

no test coverage detected