MCPcopy
hub / github.com/grafana/dskit / WatchPrefix

Method WatchPrefix

kv/etcd/etcd.go:229–267  ·  kv/etcd/etcd.go::Client.WatchPrefix

WatchPrefix implements kv.Client.

(ctx context.Context, key string, f func(string, interface{}) bool)

Source from the content-addressed store, hash-verified

227
228// WatchPrefix implements kv.Client.
229func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) {
230 backoff := backoff.New(ctx, backoff.Config{
231 MinBackoff: 1 * time.Second,
232 MaxBackoff: 1 * time.Minute,
233 })
234
235 // Ensure the context used by the Watch is always cancelled.
236 watchCtx, cancel := context.WithCancel(ctx)
237 defer cancel()
238
239outer:
240 for backoff.Ongoing() {
241 for resp := range c.cli.Watch(watchCtx, key, clientv3.WithPrefix()) {
242 if err := resp.Err(); err != nil {
243 level.Error(c.logger).Log("msg", "watch error", "key", key, "err", err)
244 continue outer
245 }
246
247 backoff.Reset()
248
249 for _, event := range resp.Events {
250 if event.Kv.Version == 0 && event.Kv.Value == nil {
251 // Delete notification. Since not all KV store clients (and codecs) support this, we ignore it.
252 continue
253 }
254
255 out, err := c.codec.Decode(event.Kv.Value)
256 if err != nil {
257 level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
258 continue
259 }
260
261 if !f(string(event.Kv.Key), out) {
262 return
263 }
264 }
265 }
266 }
267}
268
269// List implements kv.Client.
270func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {

Callers

nothing calls this directly

Calls 8

NewFunction · 0.92
OngoingMethod · 0.80
DecodeMethod · 0.65
WatchMethod · 0.45
ErrMethod · 0.45
LogMethod · 0.45
ErrorMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected