CAS implements kv.Client.
(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error))
| 121 | |
| 122 | // CAS implements kv.Client. |
| 123 | func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { |
| 124 | var revision int64 |
| 125 | var lastErr error |
| 126 | |
| 127 | for i := 0; i < c.cfg.MaxRetries; i++ { |
| 128 | resp, err := c.cli.Get(ctx, key) |
| 129 | if err != nil { |
| 130 | level.Error(c.logger).Log("msg", "error getting key", "key", key, "err", err) |
| 131 | lastErr = err |
| 132 | continue |
| 133 | } |
| 134 | |
| 135 | var intermediate interface{} |
| 136 | if len(resp.Kvs) > 0 { |
| 137 | intermediate, err = c.codec.Decode(resp.Kvs[0].Value) |
| 138 | if err != nil { |
| 139 | level.Error(c.logger).Log("msg", "error decoding key", "key", key, "err", err) |
| 140 | lastErr = err |
| 141 | continue |
| 142 | } |
| 143 | revision = resp.Kvs[0].Version |
| 144 | } |
| 145 | |
| 146 | var retry bool |
| 147 | intermediate, retry, err = f(intermediate) |
| 148 | if err != nil { |
| 149 | if !retry { |
| 150 | return err |
| 151 | } |
| 152 | lastErr = err |
| 153 | continue |
| 154 | } |
| 155 | |
| 156 | // Callback returning nil means it doesn't want to CAS anymore. |
| 157 | if intermediate == nil { |
| 158 | return nil |
| 159 | } |
| 160 | |
| 161 | buf, err := c.codec.Encode(intermediate) |
| 162 | if err != nil { |
| 163 | level.Error(c.logger).Log("msg", "error serialising value", "key", key, "err", err) |
| 164 | lastErr = err |
| 165 | continue |
| 166 | } |
| 167 | |
| 168 | result, err := c.cli.Txn(ctx). |
| 169 | If(clientv3.Compare(clientv3.Version(key), "=", revision)). |
| 170 | Then(clientv3.OpPut(key, string(buf))). |
| 171 | Commit() |
| 172 | if err != nil { |
| 173 | level.Error(c.logger).Log("msg", "error CASing", "key", key, "err", err) |
| 174 | lastErr = err |
| 175 | continue |
| 176 | } |
| 177 | // result is not Succeeded if the the comparison was false, meaning if the modify indexes did not match. |
| 178 | if !result.Succeeded { |
| 179 | level.Debug(c.logger).Log("msg", "failed to CAS, revision and version did not match in etcd", "key", key, "revision", revision) |
| 180 | continue |