MCPcopy
hub / github.com/grafana/dskit / shuffleShard

Method shuffleShard

ring/partition_ring.go:231–325  ·  view source on GitHub ↗
(identifier string, size int, lookbackPeriod time.Duration, now time.Time)

Source from the content-addressed store, hash-verified

229}
230
231func (r *PartitionRing) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) (*PartitionRing, error) {
232 // If the size is too small or too large, run with a size equal to the total number of partitions.
233 // We have to run the function anyway because the logic may filter out some INACTIVE partitions.
234 if size <= 0 || size >= len(r.desc.Partitions) {
235 size = len(r.desc.Partitions)
236 }
237
238 var lookbackUntil int64
239 if lookbackPeriod > 0 {
240 lookbackUntil = now.Add(-lookbackPeriod).Unix()
241 }
242
243 // Initialise the random generator used to select instances in the ring.
244 // There are no zones
245 random := rand.New(rand.NewSource(shardUtil.ShuffleShardSeed(identifier, "")))
246
247 // To select one more instance while guaranteeing the "consistency" property,
248 // we do pick a random value from the generator and resolve uniqueness collisions
249 // (if any) continuing walking the ring.
250 tokensCount := len(r.ringTokens)
251
252 result := make(map[int32]struct{}, size)
253 exclude := map[int32]struct{}{}
254
255 for len(result) < size {
256 start := searchToken(r.ringTokens, random.Uint32())
257 iterations := 0
258 found := false
259
260 for p := start; !found && iterations < tokensCount; p++ {
261 iterations++
262
263 // Wrap p around in the ring.
264 if p >= tokensCount {
265 p %= tokensCount
266 }
267
268 pid, ok := r.partitionByToken[Token(r.ringTokens[p])]
269 if !ok {
270 return nil, ErrInconsistentTokensInfo
271 }
272
273 // Ensure the partition has not already been included or excluded.
274 if _, ok := result[pid]; ok {
275 continue
276 }
277 if _, ok := exclude[pid]; ok {
278 continue
279 }
280
281 p, ok := r.desc.Partitions[pid]
282 if !ok {
283 return nil, ErrInconsistentTokensInfo
284 }
285
286 // PENDING partitions should be skipped because they're not ready for read or write yet,
287 // and they don't need to be looked back.
288 if p.IsPending() {

Callers 2

ShuffleShardMethod · 0.95

Calls 8

searchTokenFunction · 0.85
TokenTypeAlias · 0.85
IsPendingMethod · 0.80
GetStateTimestampMethod · 0.80
IsActiveMethod · 0.80
WithPartitionsMethod · 0.80
AddMethod · 0.65

Tested by

no test coverage detected