notifyWatcherSync immediately sends notification to all watchers of given key.
(key string)
| 1219 | |
| 1220 | // notifyWatcherSync immediately sends notification to all watchers of given key. |
| 1221 | func (m *KV) notifyWatchersSync(key string) { |
| 1222 | m.watchersMu.Lock() |
| 1223 | defer m.watchersMu.Unlock() |
| 1224 | |
| 1225 | for _, kw := range m.watchers[key] { |
| 1226 | select { |
| 1227 | case kw <- key: |
| 1228 | // notification sent. |
| 1229 | default: |
| 1230 | // cannot send notification to this watcher at the moment |
| 1231 | // but since this is a buffered channel, it means that |
| 1232 | // there is already a pending notification anyway |
| 1233 | } |
| 1234 | } |
| 1235 | |
| 1236 | for p, ws := range m.prefixWatchers { |
| 1237 | if strings.HasPrefix(key, p) { |
| 1238 | for _, pw := range ws { |
| 1239 | select { |
| 1240 | case pw <- key: |
| 1241 | // notification sent. |
| 1242 | default: |
| 1243 | c, _ := m.watchPrefixDroppedNotifications.GetMetricWithLabelValues(p) |
| 1244 | if c != nil { |
| 1245 | c.Inc() |
| 1246 | } |
| 1247 | |
| 1248 | level.Warn(m.logger).Log("msg", "failed to send notification to prefix watcher", "prefix", p) |
| 1249 | } |
| 1250 | } |
| 1251 | } |
| 1252 | } |
| 1253 | } |
| 1254 | |
| 1255 | func (m *KV) Delete(key string) error { |
| 1256 | m.storeMu.Lock() |
no test coverage detected