MCPcopy
hub / github.com/grafana/dskit / TestRing_ShuffleShardWithLookback_CachingConcurrency

Function TestRing_ShuffleShardWithLookback_CachingConcurrency

ring/ring_test.go:3607–3662  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

3605}
3606
3607func TestRing_ShuffleShardWithLookback_CachingConcurrency(t *testing.T) {
3608 const (
3609 numWorkers = 10
3610 numRequestsPerWorker = 1000
3611 )
3612
3613 now := time.Now()
3614 cfg := Config{KVStore: kv.Config{}, ReplicationFactor: 1, ZoneAwarenessEnabled: true}
3615 registry := prometheus.NewRegistry()
3616 ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), registry, log.NewNopLogger())
3617 require.NoError(t, err)
3618
3619 gen := initTokenGenerator(t)
3620
3621 // Add some instances to the ring.
3622 ringDesc := &Desc{Ingesters: map[string]InstanceDesc{
3623 "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)),
3624 "instance-2": makeReadOnly(generateRingInstanceWithInfo("instance-2", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now),
3625 "instance-3": generateRingInstanceWithInfo("instance-3", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)),
3626 "instance-4": generateRingInstanceWithInfo("instance-4", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)),
3627 "instance-5": generateRingInstanceWithInfo("instance-5", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)),
3628 "instance-6": makeReadOnly(generateRingInstanceWithInfo("instance-6", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now.Add(-2*time.Hour)),
3629 }}
3630
3631 ring.updateRingState(ringDesc)
3632
3633 for _, shardSize := range []int{3, 0} {
3634 t.Run(fmt.Sprintf("shardSize=%d", shardSize), func(t *testing.T) {
3635 // Start the workers.
3636 wg := sync.WaitGroup{}
3637 wg.Add(numWorkers)
3638
3639 for w := 0; w < numWorkers; w++ {
3640 go func(workerID int) {
3641 defer wg.Done()
3642
3643 // Get the subring once. This is the one expected from subsequent requests.
3644 userID := fmt.Sprintf("user-%d", workerID)
3645 expected := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now)
3646
3647 for r := 0; r < numRequestsPerWorker; r++ {
3648 actual := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now)
3649 require.Equal(t, expected, actual)
3650
3651 // Get the subring for a new user each time too, in order to stress the setter too
3652 // (if we only read from the cache there's no read/write concurrent access).
3653 ring.ShuffleShardWithLookback(fmt.Sprintf("stress-%d", r), shardSize, time.Hour, now)
3654 }
3655 }(w)
3656 }
3657
3658 // Wait until all workers have done.
3659 wg.Wait()
3660 })
3661 }
3662}
3663
3664func BenchmarkRing_ShuffleShard(b *testing.B) {

Callers

nothing calls this directly

Calls 13

initTokenGeneratorFunction · 0.85
makeReadOnlyFunction · 0.85
updateRingStateMethod · 0.80
RunMethod · 0.80
GenerateTokensMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
EqualMethod · 0.45

Tested by

no test coverage detected