WatchPrefix will watch a given prefix in Consul for new keys and changes to existing keys under that prefix. When the value under said key changes, the f callback is called with the deserialised value. Values in Consul are assumed to be JSON. This function blocks until the context is cancelled.
(ctx context.Context, prefix string, f func(string, interface{}) bool)
| 272 | // When the value under said key changes, the f callback is called with the deserialised value. |
| 273 | // Values in Consul are assumed to be JSON. This function blocks until the context is cancelled. |
| 274 | func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { |
| 275 | var ( |
| 276 | backoff = backoff.New(ctx, backoffConfig) |
| 277 | index = uint64(0) |
| 278 | limiter = c.createRateLimiter() |
| 279 | ) |
| 280 | for backoff.Ongoing() { |
| 281 | err := limiter.Wait(ctx) |
| 282 | if err != nil { |
| 283 | if errors.Is(err, context.Canceled) { |
| 284 | break |
| 285 | } |
| 286 | level.Error(c.logger).Log("msg", "error while rate-limiting", "prefix", prefix, "err", err) |
| 287 | backoff.Wait() |
| 288 | continue |
| 289 | } |
| 290 | |
| 291 | queryOptions := &consul.QueryOptions{ |
| 292 | AllowStale: !c.cfg.ConsistentReads, |
| 293 | RequireConsistent: c.cfg.ConsistentReads, |
| 294 | WaitIndex: index, |
| 295 | WaitTime: longPollDuration, |
| 296 | } |
| 297 | |
| 298 | kvps, meta, err := c.kv.List(prefix, queryOptions.WithContext(ctx)) |
| 299 | // kvps being nil here is not an error -- quite the opposite. Consul returns index, |
| 300 | // which makes next query blocking, so there is no need to detect this and act on it. |
| 301 | if err != nil { |
| 302 | level.Error(c.logger).Log("msg", "error getting path", "prefix", prefix, "err", err) |
| 303 | backoff.Wait() |
| 304 | continue |
| 305 | } |
| 306 | backoff.Reset() |
| 307 | |
| 308 | newIndex, skip := checkLastIndex(index, meta.LastIndex) |
| 309 | if skip { |
| 310 | continue |
| 311 | } |
| 312 | |
| 313 | for _, kvp := range kvps { |
| 314 | // We asked for values newer than 'index', but Consul returns all values below given prefix, |
| 315 | // even those that haven't changed. We don't need to report all of them as updated. |
| 316 | if index > 0 && kvp.ModifyIndex <= index && kvp.CreateIndex <= index { |
| 317 | continue |
| 318 | } |
| 319 | |
| 320 | out, err := c.codec.Decode(kvp.Value) |
| 321 | if err != nil { |
| 322 | level.Error(c.logger).Log("msg", "error decoding list of values for prefix:key", "prefix", prefix, "key", kvp.Key, "err", err) |
| 323 | continue |
| 324 | } |
| 325 | if !f(kvp.Key, out) { |
| 326 | return |
| 327 | } |
| 328 | } |
| 329 | |
| 330 | index = newIndex |
| 331 | } |