WatchKey implements kv.Client.
(ctx context.Context, key string, f func(interface{}) bool)
| 191 | |
| 192 | // WatchKey implements kv.Client. |
| 193 | func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { |
| 194 | backoff := backoff.New(ctx, backoff.Config{ |
| 195 | MinBackoff: 1 * time.Second, |
| 196 | MaxBackoff: 1 * time.Minute, |
| 197 | }) |
| 198 | |
| 199 | // Ensure the context used by the Watch is always cancelled. |
| 200 | watchCtx, cancel := context.WithCancel(ctx) |
| 201 | defer cancel() |
| 202 | |
| 203 | outer: |
| 204 | for backoff.Ongoing() { |
| 205 | for resp := range c.cli.Watch(watchCtx, key) { |
| 206 | if err := resp.Err(); err != nil { |
| 207 | level.Error(c.logger).Log("msg", "watch error", "key", key, "err", err) |
| 208 | continue outer |
| 209 | } |
| 210 | |
| 211 | backoff.Reset() |
| 212 | |
| 213 | for _, event := range resp.Events { |
| 214 | out, err := c.codec.Decode(event.Kv.Value) |
| 215 | if err != nil { |
| 216 | level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) |
| 217 | continue |
| 218 | } |
| 219 | |
| 220 | if !f(out) { |
| 221 | return |
| 222 | } |
| 223 | } |
| 224 | } |
| 225 | } |
| 226 | } |
| 227 | |
| 228 | // WatchPrefix implements kv.Client. |
| 229 | func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) { |