(t *testing.T)
| 2365 | } |
| 2366 | |
| 2367 | func TestMemberlist_WatchPrefix(t *testing.T) { |
| 2368 | t.Run("buffer size configuration", func(t *testing.T) { |
| 2369 | // Test custom buffer size |
| 2370 | var cfg KVConfig |
| 2371 | flagext.DefaultValues(&cfg) |
| 2372 | cfg.WatchPrefixBufferSize = 10 |
| 2373 | kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry()) |
| 2374 | require.Equal(t, 10, kv.cfg.WatchPrefixBufferSize) |
| 2375 | |
| 2376 | // Test invalid buffer size is rejected |
| 2377 | kvInit := NewKVInitService(&KVConfig{WatchPrefixBufferSize: -3}, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry()) |
| 2378 | _, err := kvInit.GetMemberlistKV() |
| 2379 | require.Error(t, err) |
| 2380 | require.Contains(t, err.Error(), "invalid WatchPrefixBufferSize") |
| 2381 | |
| 2382 | kvInit = NewKVInitService(&KVConfig{WatchPrefixBufferSize: 0}, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry()) |
| 2383 | _, err = kvInit.GetMemberlistKV() |
| 2384 | require.Error(t, err) |
| 2385 | require.Contains(t, err.Error(), "invalid WatchPrefixBufferSize") |
| 2386 | }) |
| 2387 | |
| 2388 | t.Run("buffering behavior", func(t *testing.T) { |
| 2389 | // Create KV with small buffer and fast notification interval |
| 2390 | var cfg KVConfig |
| 2391 | flagext.DefaultValues(&cfg) |
| 2392 | cfg.WatchPrefixBufferSize = 2 |
| 2393 | // Disable notification batching to get immediate notifications |
| 2394 | cfg.NotifyInterval = 0 |
| 2395 | kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry()) |
| 2396 | require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) |
| 2397 | defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck |
| 2398 | |
| 2399 | // Setup test data |
| 2400 | codec := dataCodec{} |
| 2401 | prefix := "test_prefix_" |
| 2402 | keys := []string{prefix + "1", prefix + "2", prefix + "3"} |
| 2403 | |
| 2404 | // Create a channel to receive notifications |
| 2405 | notifications := make([]string, 0) |
| 2406 | mu := sync.Mutex{} // Protect notifications slice |
| 2407 | done := make(chan struct{}) |
| 2408 | |
| 2409 | ctx, cancel := context.WithCancel(context.Background()) |
| 2410 | defer cancel() |
| 2411 | |
| 2412 | // Start watching prefix |
| 2413 | go func() { |
| 2414 | kv.WatchPrefix(ctx, prefix, codec, func(key string, _ any) bool { |
| 2415 | mu.Lock() |
| 2416 | notifications = append(notifications, key) |
| 2417 | count := len(notifications) |
| 2418 | mu.Unlock() |
| 2419 | |
| 2420 | if count == len(keys) { |
| 2421 | close(done) |
| 2422 | } |
| 2423 | return true // keep watching |
| 2424 | }) |
nothing calls this directly
no test coverage detected