(ctx context.Context, r ReadRing, op Operation, minStability, maxWaiting time.Duration, isChanged func(ReplicationSet, ReplicationSet) bool)
| 74 | } |
| 75 | |
| 76 | func waitStability(ctx context.Context, r ReadRing, op Operation, minStability, maxWaiting time.Duration, isChanged func(ReplicationSet, ReplicationSet) bool) error { |
| 77 | // Configure the max waiting time as a context deadline. |
| 78 | ctx, cancel := context.WithTimeout(ctx, maxWaiting) |
| 79 | defer cancel() |
| 80 | |
| 81 | // Get the initial ring state. |
| 82 | ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck |
| 83 | ringLastStateTs := time.Now() |
| 84 | |
| 85 | const pollingFrequency = time.Second |
| 86 | pollingTicker := time.NewTicker(pollingFrequency) |
| 87 | defer pollingTicker.Stop() |
| 88 | |
| 89 | for { |
| 90 | select { |
| 91 | case <-ctx.Done(): |
| 92 | return ctx.Err() |
| 93 | case <-pollingTicker.C: |
| 94 | // We ignore the error because in case of error it will return an empty |
| 95 | // replication set which we use to compare with the previous state. |
| 96 | currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck |
| 97 | |
| 98 | if isChanged(ringLastState, currRingState) { |
| 99 | ringLastState = currRingState |
| 100 | ringLastStateTs = time.Now() |
| 101 | } else if time.Since(ringLastStateTs) >= minStability { |
| 102 | return nil |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | // MakeBuffersForGet returns buffers to use with Ring.Get(). |
| 109 | func MakeBuffersForGet() (bufDescs []InstanceDesc, bufHosts, bufZones []string) { |
no test coverage detected