MCPcopy
hub / github.com/grafana/dskit / WatchKey

Method WatchKey

kv/consul/client.go:214–269  ·  view source on GitHub ↗

WatchKey will watch a given key in consul for changes. When the value under said key changes, the f callback is called with the deserialised value. To construct the deserialised value, a factory function should be supplied which generates an empty struct for WatchKey to deserialise into. This functi

(ctx context.Context, key string, f func(interface{}) bool)

Source from the content-addressed store, hash-verified

212// supplied which generates an empty struct for WatchKey to deserialise
213// into. This function blocks until the context is cancelled or f returns false.
214func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
215 var (
216 backoff = backoff.New(ctx, backoffConfig)
217 index = uint64(0)
218 limiter = c.createRateLimiter()
219 )
220
221 for backoff.Ongoing() {
222 err := limiter.Wait(ctx)
223 if err != nil {
224 if errors.Is(err, context.Canceled) {
225 break
226 }
227 level.Error(c.logger).Log("msg", "error while rate-limiting", "key", key, "err", err)
228 backoff.Wait()
229 continue
230 }
231
232 queryOptions := &consul.QueryOptions{
233 AllowStale: !c.cfg.ConsistentReads,
234 RequireConsistent: c.cfg.ConsistentReads,
235 WaitIndex: index,
236 WaitTime: longPollDuration,
237 }
238
239 kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx))
240 // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value,
241 // and next call to Get will block as expected. We handle missing value below.
242 if err != nil {
243 level.Error(c.logger).Log("msg", "error getting path", "key", key, "err", err)
244 backoff.Wait()
245 continue
246 }
247 backoff.Reset()
248
249 skip := false
250 index, skip = checkLastIndex(index, meta.LastIndex)
251 if skip {
252 continue
253 }
254
255 if kvp == nil {
256 level.Debug(c.logger).Log("msg", "value is nil", "key", key, "index", index)
257 continue
258 }
259
260 out, err := c.codec.Decode(kvp.Value)
261 if err != nil {
262 level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
263 continue
264 }
265 if !f(out) {
266 return
267 }
268 }
269}
270
271// WatchPrefix will watch a given prefix in Consul for new keys and changes to existing keys under that prefix.

Callers

nothing calls this directly

Calls 11

createRateLimiterMethod · 0.95
NewFunction · 0.92
checkLastIndexFunction · 0.85
OngoingMethod · 0.80
IsMethod · 0.80
GetMethod · 0.65
DecodeMethod · 0.65
WaitMethod · 0.45
LogMethod · 0.45
ErrorMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected