| 134 | } |
| 135 | |
| 136 | func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { |
| 137 | retries := c.cfg.MaxCasRetries |
| 138 | if retries == 0 { |
| 139 | retries = 10 |
| 140 | } |
| 141 | |
| 142 | sleepBeforeRetry := time.Duration(0) |
| 143 | if c.cfg.CasRetryDelay > 0 { |
| 144 | sleepBeforeRetry = time.Duration(rand.Int63n(c.cfg.CasRetryDelay.Nanoseconds())) |
| 145 | } |
| 146 | |
| 147 | index := uint64(0) |
| 148 | for i := 0; i < retries; i++ { |
| 149 | if i > 0 && sleepBeforeRetry > 0 { |
| 150 | time.Sleep(sleepBeforeRetry) |
| 151 | } |
| 152 | |
| 153 | // Get with default options - don't want stale data to compare with |
| 154 | options := &consul.QueryOptions{} |
| 155 | kvp, _, err := c.kv.Get(key, options.WithContext(ctx)) |
| 156 | if err != nil { |
| 157 | level.Error(c.logger).Log("msg", "error getting key", "key", key, "err", err) |
| 158 | continue |
| 159 | } |
| 160 | var intermediate interface{} |
| 161 | if kvp != nil { |
| 162 | out, err := c.codec.Decode(kvp.Value) |
| 163 | if err != nil { |
| 164 | level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) |
| 165 | continue |
| 166 | } |
| 167 | // If key doesn't exist, index will be 0. |
| 168 | index = kvp.ModifyIndex |
| 169 | intermediate = out |
| 170 | } |
| 171 | |
| 172 | intermediate, retry, err := f(intermediate) |
| 173 | if err != nil { |
| 174 | if !retry { |
| 175 | return err |
| 176 | } |
| 177 | continue |
| 178 | } |
| 179 | |
| 180 | // Treat the callback returning nil for intermediate as a decision to |
| 181 | // not actually write to Consul, but this is not an error. |
| 182 | if intermediate == nil { |
| 183 | return nil |
| 184 | } |
| 185 | |
| 186 | bytes, err := c.codec.Encode(intermediate) |
| 187 | if err != nil { |
| 188 | level.Error(c.logger).Log("msg", "error serialising value", "key", key, "err", err) |
| 189 | continue |
| 190 | } |
| 191 | ok, _, err := c.kv.CAS(&consul.KVPair{ |
| 192 | Key: key, |
| 193 | Value: bytes, |