WatchKey watches for value changes for given key. When value changes, 'f' function is called with the latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call. Watching ends when 'f' returns false, context is done, or this client is shut down.
(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool)
| 1060 | // |
| 1061 | // Watching ends when 'f' returns false, context is done, or this client is shut down. |
| 1062 | func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool) { |
| 1063 | // keep one extra notification, to avoid missing notification if we're busy running the function |
| 1064 | w := make(chan string, 1) |
| 1065 | |
| 1066 | // register watcher |
| 1067 | m.watchersMu.Lock() |
| 1068 | m.watchers[key] = append(m.watchers[key], w) |
| 1069 | m.watchersMu.Unlock() |
| 1070 | |
| 1071 | defer func() { |
| 1072 | // unregister watcher on exit |
| 1073 | m.watchersMu.Lock() |
| 1074 | defer m.watchersMu.Unlock() |
| 1075 | |
| 1076 | removeWatcherChannel(key, w, m.watchers) |
| 1077 | }() |
| 1078 | |
| 1079 | for { |
| 1080 | select { |
| 1081 | case <-w: |
| 1082 | // value changed |
| 1083 | val, _, err := m.get(key, codec) |
| 1084 | if err != nil { |
| 1085 | level.Warn(m.logger).Log("msg", "failed to decode value while watching for changes", "key", key, "err", err) |
| 1086 | continue |
| 1087 | } |
| 1088 | |
| 1089 | if !f(val) { |
| 1090 | return |
| 1091 | } |
| 1092 | |
| 1093 | case <-m.shutdown: |
| 1094 | // stop watching on shutdown |
| 1095 | return |
| 1096 | |
| 1097 | case <-ctx.Done(): |
| 1098 | return |
| 1099 | } |
| 1100 | } |
| 1101 | } |
| 1102 | |
| 1103 | // WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs, |
| 1104 | // function 'f' is called with key and current value. |