MCPcopy
hub / github.com/grafana/dskit / WatchPrefix

Method WatchPrefix

kv/consul/client.go:274–332  ·  view source on GitHub ↗

WatchPrefix will watch a given prefix in Consul for new keys and changes to existing keys under that prefix. When the value under said key changes, the f callback is called with the deserialised value. Values in Consul are assumed to be JSON. This function blocks until the context is cancelled.

(ctx context.Context, prefix string, f func(string, interface{}) bool)

Source from the content-addressed store, hash-verified

272// When the value under said key changes, the f callback is called with the deserialised value.
273// Values in Consul are assumed to be JSON. This function blocks until the context is cancelled.
274func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
275 var (
276 backoff = backoff.New(ctx, backoffConfig)
277 index = uint64(0)
278 limiter = c.createRateLimiter()
279 )
280 for backoff.Ongoing() {
281 err := limiter.Wait(ctx)
282 if err != nil {
283 if errors.Is(err, context.Canceled) {
284 break
285 }
286 level.Error(c.logger).Log("msg", "error while rate-limiting", "prefix", prefix, "err", err)
287 backoff.Wait()
288 continue
289 }
290
291 queryOptions := &consul.QueryOptions{
292 AllowStale: !c.cfg.ConsistentReads,
293 RequireConsistent: c.cfg.ConsistentReads,
294 WaitIndex: index,
295 WaitTime: longPollDuration,
296 }
297
298 kvps, meta, err := c.kv.List(prefix, queryOptions.WithContext(ctx))
299 // kvps being nil here is not an error -- quite the opposite. Consul returns index,
300 // which makes next query blocking, so there is no need to detect this and act on it.
301 if err != nil {
302 level.Error(c.logger).Log("msg", "error getting path", "prefix", prefix, "err", err)
303 backoff.Wait()
304 continue
305 }
306 backoff.Reset()
307
308 newIndex, skip := checkLastIndex(index, meta.LastIndex)
309 if skip {
310 continue
311 }
312
313 for _, kvp := range kvps {
314 // We asked for values newer than 'index', but Consul returns all values below given prefix,
315 // even those that haven't changed. We don't need to report all of them as updated.
316 if index > 0 && kvp.ModifyIndex <= index && kvp.CreateIndex <= index {
317 continue
318 }
319
320 out, err := c.codec.Decode(kvp.Value)
321 if err != nil {
322 level.Error(c.logger).Log("msg", "error decoding list of values for prefix:key", "prefix", prefix, "key", kvp.Key, "err", err)
323 continue
324 }
325 if !f(kvp.Key, out) {
326 return
327 }
328 }
329
330 index = newIndex
331 }

Callers

nothing calls this directly

Calls 11

createRateLimiterMethod · 0.95
NewFunction · 0.92
checkLastIndexFunction · 0.85
OngoingMethod · 0.80
IsMethod · 0.80
ListMethod · 0.65
DecodeMethod · 0.65
WaitMethod · 0.45
LogMethod · 0.45
ErrorMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected