WatchKey will watch a given key in consul for changes. When the value under said key changes, the f callback is called with the deserialised value. To construct the deserialised value, a factory function should be supplied which generates an empty struct for WatchKey to deserialise into. This functi
(ctx context.Context, key string, f func(interface{}) bool)
| 212 | // supplied which generates an empty struct for WatchKey to deserialise |
| 213 | // into. This function blocks until the context is cancelled or f returns false. |
| 214 | func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { |
| 215 | var ( |
| 216 | backoff = backoff.New(ctx, backoffConfig) |
| 217 | index = uint64(0) |
| 218 | limiter = c.createRateLimiter() |
| 219 | ) |
| 220 | |
| 221 | for backoff.Ongoing() { |
| 222 | err := limiter.Wait(ctx) |
| 223 | if err != nil { |
| 224 | if errors.Is(err, context.Canceled) { |
| 225 | break |
| 226 | } |
| 227 | level.Error(c.logger).Log("msg", "error while rate-limiting", "key", key, "err", err) |
| 228 | backoff.Wait() |
| 229 | continue |
| 230 | } |
| 231 | |
| 232 | queryOptions := &consul.QueryOptions{ |
| 233 | AllowStale: !c.cfg.ConsistentReads, |
| 234 | RequireConsistent: c.cfg.ConsistentReads, |
| 235 | WaitIndex: index, |
| 236 | WaitTime: longPollDuration, |
| 237 | } |
| 238 | |
| 239 | kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) |
| 240 | // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, |
| 241 | // and next call to Get will block as expected. We handle missing value below. |
| 242 | if err != nil { |
| 243 | level.Error(c.logger).Log("msg", "error getting path", "key", key, "err", err) |
| 244 | backoff.Wait() |
| 245 | continue |
| 246 | } |
| 247 | backoff.Reset() |
| 248 | |
| 249 | skip := false |
| 250 | index, skip = checkLastIndex(index, meta.LastIndex) |
| 251 | if skip { |
| 252 | continue |
| 253 | } |
| 254 | |
| 255 | if kvp == nil { |
| 256 | level.Debug(c.logger).Log("msg", "value is nil", "key", key, "index", index) |
| 257 | continue |
| 258 | } |
| 259 | |
| 260 | out, err := c.codec.Decode(kvp.Value) |
| 261 | if err != nil { |
| 262 | level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) |
| 263 | continue |
| 264 | } |
| 265 | if !f(out) { |
| 266 | return |
| 267 | } |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | // WatchPrefix will watch a given prefix in Consul for new keys and changes to existing keys under that prefix. |