(t *testing.T)
| 3605 | } |
| 3606 | |
| 3607 | func TestRing_ShuffleShardWithLookback_CachingConcurrency(t *testing.T) { |
| 3608 | const ( |
| 3609 | numWorkers = 10 |
| 3610 | numRequestsPerWorker = 1000 |
| 3611 | ) |
| 3612 | |
| 3613 | now := time.Now() |
| 3614 | cfg := Config{KVStore: kv.Config{}, ReplicationFactor: 1, ZoneAwarenessEnabled: true} |
| 3615 | registry := prometheus.NewRegistry() |
| 3616 | ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), registry, log.NewNopLogger()) |
| 3617 | require.NoError(t, err) |
| 3618 | |
| 3619 | gen := initTokenGenerator(t) |
| 3620 | |
| 3621 | // Add some instances to the ring. |
| 3622 | ringDesc := &Desc{Ingesters: map[string]InstanceDesc{ |
| 3623 | "instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), |
| 3624 | "instance-2": makeReadOnly(generateRingInstanceWithInfo("instance-2", "zone-a", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now), |
| 3625 | "instance-3": generateRingInstanceWithInfo("instance-3", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), |
| 3626 | "instance-4": generateRingInstanceWithInfo("instance-4", "zone-b", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), |
| 3627 | "instance-5": generateRingInstanceWithInfo("instance-5", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), |
| 3628 | "instance-6": makeReadOnly(generateRingInstanceWithInfo("instance-6", "zone-c", gen.GenerateTokens(128, nil), now.Add(-2*time.Hour)), now.Add(-2*time.Hour)), |
| 3629 | }} |
| 3630 | |
| 3631 | ring.updateRingState(ringDesc) |
| 3632 | |
| 3633 | for _, shardSize := range []int{3, 0} { |
| 3634 | t.Run(fmt.Sprintf("shardSize=%d", shardSize), func(t *testing.T) { |
| 3635 | // Start the workers. |
| 3636 | wg := sync.WaitGroup{} |
| 3637 | wg.Add(numWorkers) |
| 3638 | |
| 3639 | for w := 0; w < numWorkers; w++ { |
| 3640 | go func(workerID int) { |
| 3641 | defer wg.Done() |
| 3642 | |
| 3643 | // Get the subring once. This is the one expected from subsequent requests. |
| 3644 | userID := fmt.Sprintf("user-%d", workerID) |
| 3645 | expected := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now) |
| 3646 | |
| 3647 | for r := 0; r < numRequestsPerWorker; r++ { |
| 3648 | actual := ring.ShuffleShardWithLookback(userID, shardSize, time.Hour, now) |
| 3649 | require.Equal(t, expected, actual) |
| 3650 | |
| 3651 | // Get the subring for a new user each time too, in order to stress the setter too |
| 3652 | // (if we only read from the cache there's no read/write concurrent access). |
| 3653 | ring.ShuffleShardWithLookback(fmt.Sprintf("stress-%d", r), shardSize, time.Hour, now) |
| 3654 | } |
| 3655 | }(w) |
| 3656 | } |
| 3657 | |
| 3658 | // Wait until all workers have done. |
| 3659 | wg.Wait() |
| 3660 | }) |
| 3661 | } |
| 3662 | } |
| 3663 | |
| 3664 | func BenchmarkRing_ShuffleShard(b *testing.B) { |
nothing calls this directly
no test coverage detected