(t *testing.T)
| 19 | ) |
| 20 | |
| 21 | func TestPartitionInstanceLifecycler(t *testing.T) { |
| 22 | const eventuallyTick = 10 * time.Millisecond |
| 23 | |
| 24 | ctx := context.Background() |
| 25 | logger := log.NewNopLogger() |
| 26 | |
| 27 | t.Run("should wait for the configured minimum number of owners before switching a pending partition to active", func(t *testing.T) { |
| 28 | t.Parallel() |
| 29 | |
| 30 | lifecycler1aConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-a-1") |
| 31 | lifecycler1bConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-zone-b-1") |
| 32 | for _, cfg := range []*PartitionInstanceLifecyclerConfig{&lifecycler1aConfig, &lifecycler1bConfig} { |
| 33 | cfg.WaitOwnersCountOnPending = 2 |
| 34 | cfg.WaitOwnersDurationOnPending = 0 |
| 35 | } |
| 36 | |
| 37 | store, closer := consul.NewInMemoryClient(GetPartitionRingCodec(), log.NewNopLogger(), nil) |
| 38 | t.Cleanup(func() { assert.NoError(t, closer.Close()) }) |
| 39 | |
| 40 | // Start instance-zone-a-1 lifecycler. |
| 41 | lifecycler1a := NewPartitionInstanceLifecycler(lifecycler1aConfig, "test", ringKey, store, logger, nil) |
| 42 | require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1a)) |
| 43 | t.Cleanup(func() { |
| 44 | require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1a)) |
| 45 | }) |
| 46 | |
| 47 | // We expect the partition to NOT switch to active. |
| 48 | time.Sleep(2 * lifecycler1aConfig.PollingInterval) |
| 49 | |
| 50 | actual := getPartitionRingFromStore(t, store, ringKey) |
| 51 | assert.Len(t, actual.Partitions, 1) |
| 52 | assert.True(t, actual.HasPartition(1)) |
| 53 | assert.Equal(t, PartitionPending, actual.Partitions[1].State) |
| 54 | assert.ElementsMatch(t, []string{"instance-zone-a-1"}, actual.ownersByPartition()[1]) |
| 55 | |
| 56 | // Start instance-zone-b-1 lifecycler. |
| 57 | lifecycler1b := NewPartitionInstanceLifecycler(lifecycler1bConfig, "test", ringKey, store, logger, nil) |
| 58 | require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1b)) |
| 59 | t.Cleanup(func() { |
| 60 | require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler1b)) |
| 61 | }) |
| 62 | |
| 63 | actual = getPartitionRingFromStore(t, store, ringKey) |
| 64 | assert.ElementsMatch(t, []string{"instance-zone-a-1", "instance-zone-b-1"}, actual.ownersByPartition()[1]) |
| 65 | |
| 66 | // We expect the partition to switch to active state. |
| 67 | assert.Eventually(t, func() bool { |
| 68 | actual := getPartitionRingFromStore(t, store, ringKey) |
| 69 | return actual.Partitions[1].State == PartitionActive |
| 70 | }, 3*time.Second, eventuallyTick) |
| 71 | }) |
| 72 | |
| 73 | t.Run("should wait for the configured minimum waiting time before switching a pending partition to active", func(t *testing.T) { |
| 74 | t.Parallel() |
| 75 | |
| 76 | lifecyclerConfig := createTestPartitionInstanceLifecyclerConfig(1, "instance-1") |
| 77 | lifecyclerConfig.WaitOwnersCountOnPending = 1 |
| 78 | lifecyclerConfig.WaitOwnersDurationOnPending = 2 * time.Second |
nothing calls this directly
no test coverage detected