MCPcopy
hub / github.com/grafana/dskit / TestMemberlist_WatchPrefix

Function TestMemberlist_WatchPrefix

kv/memberlist/memberlist_client_test.go:2367–2461  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2365}
2366
2367func TestMemberlist_WatchPrefix(t *testing.T) {
2368 t.Run("buffer size configuration", func(t *testing.T) {
2369 // Test custom buffer size
2370 var cfg KVConfig
2371 flagext.DefaultValues(&cfg)
2372 cfg.WatchPrefixBufferSize = 10
2373 kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry())
2374 require.Equal(t, 10, kv.cfg.WatchPrefixBufferSize)
2375
2376 // Test invalid buffer size is rejected
2377 kvInit := NewKVInitService(&KVConfig{WatchPrefixBufferSize: -3}, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry())
2378 _, err := kvInit.GetMemberlistKV()
2379 require.Error(t, err)
2380 require.Contains(t, err.Error(), "invalid WatchPrefixBufferSize")
2381
2382 kvInit = NewKVInitService(&KVConfig{WatchPrefixBufferSize: 0}, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry())
2383 _, err = kvInit.GetMemberlistKV()
2384 require.Error(t, err)
2385 require.Contains(t, err.Error(), "invalid WatchPrefixBufferSize")
2386 })
2387
2388 t.Run("buffering behavior", func(t *testing.T) {
2389 // Create KV with small buffer and fast notification interval
2390 var cfg KVConfig
2391 flagext.DefaultValues(&cfg)
2392 cfg.WatchPrefixBufferSize = 2
2393 // Disable notification batching to get immediate notifications
2394 cfg.NotifyInterval = 0
2395 kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry())
2396 require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
2397 defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck
2398
2399 // Setup test data
2400 codec := dataCodec{}
2401 prefix := "test_prefix_"
2402 keys := []string{prefix + "1", prefix + "2", prefix + "3"}
2403
2404 // Create a channel to receive notifications
2405 notifications := make([]string, 0)
2406 mu := sync.Mutex{} // Protect notifications slice
2407 done := make(chan struct{})
2408
2409 ctx, cancel := context.WithCancel(context.Background())
2410 defer cancel()
2411
2412 // Start watching prefix
2413 go func() {
2414 kv.WatchPrefix(ctx, prefix, codec, func(key string, _ any) bool {
2415 mu.Lock()
2416 notifications = append(notifications, key)
2417 count := len(notifications)
2418 mu.Unlock()
2419
2420 if count == len(keys) {
2421 close(done)
2422 }
2423 return true // keep watching
2424 })

Callers

nothing calls this directly

Calls 15

GetMemberlistKVMethod · 0.95
WatchPrefixMethod · 0.95
CASMethod · 0.95
DefaultValuesFunction · 0.92
StartAndAwaitRunningFunction · 0.92
StopAndAwaitTerminatedFunction · 0.92
NewKVFunction · 0.85
NewKVInitServiceFunction · 0.85
getOrCreateDataFunction · 0.85
RunMethod · 0.80
FatalMethod · 0.80
SleepMethod · 0.65

Tested by

no test coverage detected