MCPcopy
hub / github.com/grafana/dskit / WatchKey

Method WatchKey

kv/etcd/etcd.go:193–226  ·  view source on GitHub ↗

WatchKey implements kv.Client.

(ctx context.Context, key string, f func(interface{}) bool)

Source from the content-addressed store, hash-verified

191
192// WatchKey implements kv.Client.
193func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
194 backoff := backoff.New(ctx, backoff.Config{
195 MinBackoff: 1 * time.Second,
196 MaxBackoff: 1 * time.Minute,
197 })
198
199 // Ensure the context used by the Watch is always cancelled.
200 watchCtx, cancel := context.WithCancel(ctx)
201 defer cancel()
202
203outer:
204 for backoff.Ongoing() {
205 for resp := range c.cli.Watch(watchCtx, key) {
206 if err := resp.Err(); err != nil {
207 level.Error(c.logger).Log("msg", "watch error", "key", key, "err", err)
208 continue outer
209 }
210
211 backoff.Reset()
212
213 for _, event := range resp.Events {
214 out, err := c.codec.Decode(event.Kv.Value)
215 if err != nil {
216 level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err)
217 continue
218 }
219
220 if !f(out) {
221 return
222 }
223 }
224 }
225 }
226}
227
228// WatchPrefix implements kv.Client.
229func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) {

Callers

nothing calls this directly

Calls 8

NewFunction · 0.92
OngoingMethod · 0.80
DecodeMethod · 0.65
WatchMethod · 0.45
ErrMethod · 0.45
LogMethod · 0.45
ErrorMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected