(identifier string, size int, lookbackPeriod time.Duration, now time.Time)
| 229 | } |
| 230 | |
| 231 | func (r *PartitionRing) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) (*PartitionRing, error) { |
| 232 | // If the size is too small or too large, run with a size equal to the total number of partitions. |
| 233 | // We have to run the function anyway because the logic may filter out some INACTIVE partitions. |
| 234 | if size <= 0 || size >= len(r.desc.Partitions) { |
| 235 | size = len(r.desc.Partitions) |
| 236 | } |
| 237 | |
| 238 | var lookbackUntil int64 |
| 239 | if lookbackPeriod > 0 { |
| 240 | lookbackUntil = now.Add(-lookbackPeriod).Unix() |
| 241 | } |
| 242 | |
| 243 | // Initialise the random generator used to select instances in the ring. |
| 244 | // There are no zones |
| 245 | random := rand.New(rand.NewSource(shardUtil.ShuffleShardSeed(identifier, ""))) |
| 246 | |
| 247 | // To select one more instance while guaranteeing the "consistency" property, |
| 248 | // we do pick a random value from the generator and resolve uniqueness collisions |
| 249 | // (if any) continuing walking the ring. |
| 250 | tokensCount := len(r.ringTokens) |
| 251 | |
| 252 | result := make(map[int32]struct{}, size) |
| 253 | exclude := map[int32]struct{}{} |
| 254 | |
| 255 | for len(result) < size { |
| 256 | start := searchToken(r.ringTokens, random.Uint32()) |
| 257 | iterations := 0 |
| 258 | found := false |
| 259 | |
| 260 | for p := start; !found && iterations < tokensCount; p++ { |
| 261 | iterations++ |
| 262 | |
| 263 | // Wrap p around in the ring. |
| 264 | if p >= tokensCount { |
| 265 | p %= tokensCount |
| 266 | } |
| 267 | |
| 268 | pid, ok := r.partitionByToken[Token(r.ringTokens[p])] |
| 269 | if !ok { |
| 270 | return nil, ErrInconsistentTokensInfo |
| 271 | } |
| 272 | |
| 273 | // Ensure the partition has not already been included or excluded. |
| 274 | if _, ok := result[pid]; ok { |
| 275 | continue |
| 276 | } |
| 277 | if _, ok := exclude[pid]; ok { |
| 278 | continue |
| 279 | } |
| 280 | |
| 281 | p, ok := r.desc.Partitions[pid] |
| 282 | if !ok { |
| 283 | return nil, ErrInconsistentTokensInfo |
| 284 | } |
| 285 | |
| 286 | // PENDING partitions should be skipped because they're not ready for read or write yet, |
| 287 | // and they don't need to be looked back. |
| 288 | if p.IsPending() { |
no test coverage detected