MCPcopy
hub / github.com/grafana/dskit / WatchKey

Method WatchKey

kv/memberlist/memberlist_client.go:1062–1101  ·  view source on GitHub ↗

WatchKey watches for value changes for given key. When value changes, 'f' function is called with the latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call. Watching ends when 'f' returns false, context is done, or this client is shut down.

(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool)

Source from the content-addressed store, hash-verified

1060//
1061// Watching ends when 'f' returns false, context is done, or this client is shut down.
1062func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) {
1063 // keep one extra notification, to avoid missing notification if we're busy running the function
1064 w := make(chan string, 1)
1065
1066 // register watcher
1067 m.watchersMu.Lock()
1068 m.watchers[key] = append(m.watchers[key], w)
1069 m.watchersMu.Unlock()
1070
1071 defer func() {
1072 // unregister watcher on exit
1073 m.watchersMu.Lock()
1074 defer m.watchersMu.Unlock()
1075
1076 removeWatcherChannel(key, w, m.watchers)
1077 }()
1078
1079 for {
1080 select {
1081 case <-w:
1082 // value changed
1083 val, _, err := m.get(key, codec)
1084 if err != nil {
1085 level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err)
1086 continue
1087 }
1088
1089 if !f(val) {
1090 return
1091 }
1092
1093 case <-m.shutdown:
1094 // stop watching on shutdown
1095 return
1096
1097 case <-ctx.Done():
1098 return
1099 }
1100 }
1101}
1102
1103// WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs,
1104// function 'f' is called with key and current value.

Calls 4

getMethod · 0.95
removeWatcherChannelFunction · 0.85
DoneMethod · 0.65
LogMethod · 0.45