WatchPrefix implements kv.Client.
(ctx context.Context, key string, f func(string, interface{}) bool)
| 227 | |
| 228 | // WatchPrefix implements kv.Client. |
| 229 | func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) { |
| 230 | backoff := backoff.New(ctx, backoff.Config{ |
| 231 | MinBackoff: 1 * time.Second, |
| 232 | MaxBackoff: 1 * time.Minute, |
| 233 | }) |
| 234 | |
| 235 | // Ensure the context used by the Watch is always cancelled. |
| 236 | watchCtx, cancel := context.WithCancel(ctx) |
| 237 | defer cancel() |
| 238 | |
| 239 | outer: |
| 240 | for backoff.Ongoing() { |
| 241 | for resp := range c.cli.Watch(watchCtx, key, clientv3.WithPrefix()) { |
| 242 | if err := resp.Err(); err != nil { |
| 243 | level.Error(c.logger).Log("msg", "watch error", "key", key, "err", err) |
| 244 | continue outer |
| 245 | } |
| 246 | |
| 247 | backoff.Reset() |
| 248 | |
| 249 | for _, event := range resp.Events { |
| 250 | if event.Kv.Version == 0 && event.Kv.Value == nil { |
| 251 | // Delete notification. Since not all KV store clients (and codecs) support this, we ignore it. |
| 252 | continue |
| 253 | } |
| 254 | |
| 255 | out, err := c.codec.Decode(event.Kv.Value) |
| 256 | if err != nil { |
| 257 | level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) |
| 258 | continue |
| 259 | } |
| 260 | |
| 261 | if !f(string(event.Kv.Key), out) { |
| 262 | return |
| 263 | } |
| 264 | } |
| 265 | } |
| 266 | } |
| 267 | } |
| 268 | |
| 269 | // List implements kv.Client. |
| 270 | func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { |