MCPcopy
hub / github.com/grafana/dskit / CAS

Method CAS

kv/memberlist/memberlist_client.go:1295–1343  ·  view source on GitHub ↗

CAS implements Compare-And-Set/Swap operation. CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. This method combines Compare-And-Swap with Merge: it calls 'f' function to get a new state, and then merges this new state into curre

(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error))

Source from the content-addressed store, hash-verified

1293// detect removals. If Merge doesn't result in any change (returns nil), then operation fails and is retried again.
1294// After too many failed retries, this method returns error.
1295func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error {
1296 var lastError error
1297
1298outer:
1299 for retries := m.maxCasRetries; retries > 0; retries-- {
1300 m.casAttempts.Inc()
1301
1302 if lastError == errNoChangeDetected {
1303 // We only get here, if 'f' reports some change, but Merge function reports no change. This can happen
1304 // with Ring's merge function, which depends on timestamps (and not the tokens) with 1-second resolution.
1305 // By waiting for one second, we hope that Merge will be able to detect change from 'f' function.
1306
1307 select {
1308 case <-time.After(noChangeDetectedRetrySleep):
1309 // ok
1310 case <-ctx.Done():
1311 lastError = ctx.Err()
1312 break outer
1313 }
1314 }
1315
1316 change, newver, retry, deleted, updated, err := m.trySingleCas(key, codec, f)
1317 if err != nil {
1318 level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry)
1319
1320 lastError = err
1321 if !retry {
1322 break
1323 }
1324 continue
1325 }
1326
1327 if change != nil {
1328 m.casSuccesses.Inc()
1329 m.notifyWatchers(key)
1330
1331 m.broadcastNewValue(key, change, newver, codec, true, deleted, updated)
1332 }
1333
1334 return nil
1335 }
1336 if errors.Is(lastError, errVersionMismatch) {
1337 // Version mismatch err on CAS would have been retried up to the limit
1338 lastError = errors.Wrap(lastError, "too many retries")
1339 }
1340
1341 m.casFailures.Inc()
1342 return fmt.Errorf("failed to CAS-update key %s: %v", key, lastError)
1343}
1344
1345// returns change, error (or nil, if CAS succeeded), and whether to retry or not.
1346// returns errNoChangeDetected if merge failed to detect change in f's output.

Calls 10

trySingleCasMethod · 0.95
notifyWatchersMethod · 0.95
broadcastNewValueMethod · 0.95
IsMethod · 0.80
ErrorfMethod · 0.80
AfterMethod · 0.65
DoneMethod · 0.65
WrapMethod · 0.65
ErrMethod · 0.45
LogMethod · 0.45