MCPcopy
hub / github.com/grafana/dskit / TestPartitionInstanceLifecycler

Function TestPartitionInstanceLifecycler

ring/partition_instance_lifecycler_test.go:21–342  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

19)
20
21func TestPartitionInstanceLifecycler(t *testing.T) {
22 const eventuallyTick = 10 * time.Millisecond
23
24 ctx := context.Background()
25 logger := log.NewNopLogger()
26
27 t.Run("should wait for the configured minimum number of owners before switching a pending partition to active", func(t *testing.T) {
28 t.Parallel()
29
30 lifecycler1aConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-a-1")
31 lifecycler1bConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-b-1")
32 for _, cfg := range []*PartitionInstanceLifecyclerConfig{&lifecycler1aConfig, &lifecycler1bConfig} {
33 cfg.WaitOwnersCountOnPending = 2
34 cfg.WaitOwnersDurationOnPending = 0
35 }
36
37 store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil)
38 t.Cleanup(func() { assert.NoError(t, closer.Close()) })
39
40 // Start instance-zone-a-1 lifecycler.
41 lifecycler1a := NewPartitionInstanceLifecycler(lifecycler1aConfig, "test", ringKey, store, logger, nil)
42 require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1a))
43 t.Cleanup(func() {
44 require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1a))
45 })
46
47 // We expect the partition to NOT switch to active.
48 time.Sleep(2 * lifecycler1aConfig.PollingInterval)
49
50 actual := getPartitionRingFromStore(t, store, ringKey)
51 assert.Len(t, actual.Partitions, 1)
52 assert.True(t, actual.HasPartition(1))
53 assert.Equal(t, PartitionPending, actual.Partitions[1].State)
54 assert.ElementsMatch(t, []string{"instance-zone-a-1"}, actual.ownersByPartition()[1])
55
56 // Start instance-zone-b-1 lifecycler.
57 lifecycler1b := NewPartitionInstanceLifecycler(lifecycler1bConfig, "test", ringKey, store, logger, nil)
58 require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1b))
59 t.Cleanup(func() {
60 require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1b))
61 })
62
63 actual = getPartitionRingFromStore(t, store, ringKey)
64 assert.ElementsMatch(t, []string{"instance-zone-a-1", "instance-zone-b-1"}, actual.ownersByPartition()[1])
65
66 // We expect the partition to switch to active state.
67 assert.Eventually(t, func() bool {
68 actual := getPartitionRingFromStore(t, store, ringKey)
69 return actual.Partitions[1].State == PartitionActive
70 }, 3*time.Second, eventuallyTick)
71 })
72
73 t.Run("should wait for the configured minimum waiting time before switching a pending partition to active", func(t *testing.T) {
74 t.Parallel()
75
76 lifecyclerConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-1")
77 lifecyclerConfig.WaitOwnersCountOnPending = 1
78 lifecyclerConfig.WaitOwnersDurationOnPending = 2 * time.Second

Callers

nothing calls this directly

Tested by

no test coverage detected