CAS implements Compare-And-Set/Swap operation. CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. This method combines Compare-And-Swap with Merge: it calls 'f' function to get a new state, and then merges this new state into curre
(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error))
| 1293 | // detect removals. If Merge doesn't result in any change (returns nil), then operation fails and is retried again. |
| 1294 | // After too many failed retries, this method returns error. |
| 1295 | func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error { |
| 1296 | var lastError error |
| 1297 | |
| 1298 | outer: |
| 1299 | for retries := m.maxCasRetries; retries > 0; retries-- { |
| 1300 | m.casAttempts.Inc() |
| 1301 | |
| 1302 | if lastError == errNoChangeDetected { |
| 1303 | // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen |
| 1304 | // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution. |
| 1305 | // By waiting for one second, we hope that Merge will be able to detect change from 'f' function. |
| 1306 | |
| 1307 | select { |
| 1308 | case <-time.After(noChangeDetectedRetrySleep): |
| 1309 | // ok |
| 1310 | case <-ctx.Done(): |
| 1311 | lastError = ctx.Err() |
| 1312 | break outer |
| 1313 | } |
| 1314 | } |
| 1315 | |
| 1316 | change, newver, retry, deleted, updated, err := m.trySingleCas(key, codec, f) |
| 1317 | if err != nil { |
| 1318 | level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) |
| 1319 | |
| 1320 | lastError = err |
| 1321 | if !retry { |
| 1322 | break |
| 1323 | } |
| 1324 | continue |
| 1325 | } |
| 1326 | |
| 1327 | if change != nil { |
| 1328 | m.casSuccesses.Inc() |
| 1329 | m.notifyWatchers(key) |
| 1330 | |
| 1331 | m.broadcastNewValue(key, change, newver, codec, true, deleted, updated) |
| 1332 | } |
| 1333 | |
| 1334 | return nil |
| 1335 | } |
| 1336 | if errors.Is(lastError, errVersionMismatch) { |
| 1337 | // Version mismatch err on CAS would have been retried up to the limit |
| 1338 | lastError = errors.Wrap(lastError, "too many retries") |
| 1339 | } |
| 1340 | |
| 1341 | m.casFailures.Inc() |
| 1342 | return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError) |
| 1343 | } |
| 1344 | |
| 1345 | // returns change, error (or nil, if CAS succeeded), and whether to retry or not. |
| 1346 | // returns errNoChangeDetected if merge failed to detect change in f's output. |