MCPcopy
hub / github.com/grafana/dskit / trySingleCas

Method trySingleCas

kv/memberlist/memberlist_client.go:1347–1388  ·  view source on GitHub ↗

returns change, error (or nil, if CAS succeeded), and whether to retry or not. returns errNoChangeDetected if merge failed to detect change in f's output.

(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error))

Source from the content-addressed store, hash-verified

1345// returns change, error (or nil, if CAS succeeded), and whether to retry or not.
1346// returns errNoChangeDetected if merge failed to detect change in f's output.
1347func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, bool, time.Time, error) {
1348 val, ver, err := m.get(key, codec)
1349 if err != nil {
1350 return nil, 0, false, false, time.Time{}, fmt.Errorf("failed to get value: %v", err)
1351 }
1352
1353 out, retry, err := f(val)
1354 if err != nil {
1355 return nil, 0, retry, false, time.Time{}, fmt.Errorf("fn returned error: %v", err)
1356 }
1357
1358 if out == nil {
1359 // no change to be done
1360 return nil, 0, false, false, time.Time{}, nil
1361 }
1362
1363 // Don't even try
1364 incomingValue, ok := out.(Mergeable)
1365 if !ok || incomingValue == nil {
1366 return nil, 0, retry, false, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out)
1367 }
1368
1369 // To support detection of removed items from value, we will only allow CAS operation to
1370 // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'.
1371 // Supplied function may have kept a reference to the returned "incoming value".
1372 // If KV store will keep this value as well, it needs to make a clone.
1373 change, newver, deleted, updated, err := m.mergeValueForKey(key, incomingValue, true, ver, codec.CodecID(), false, time.Now())
1374 if err == errVersionMismatch {
1375 return nil, 0, retry, false, time.Time{}, err
1376 }
1377
1378 if err != nil {
1379 return nil, 0, retry, false, time.Time{}, fmt.Errorf("merge failed: %v", err)
1380 }
1381
1382 if newver == 0 {
1383 // CAS method reacts on this error
1384 return nil, 0, retry, deleted, updated, errNoChangeDetected
1385 }
1386
1387 return change, newver, retry, deleted, updated, nil
1388}
1389
1390func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) {
1391 if locallyGenerated && m.State() != services.Running {

Callers 1

CASMethod · 0.95

Calls 4

getMethod · 0.95
mergeValueForKeyMethod · 0.95
ErrorfMethod · 0.80
CodecIDMethod · 0.65

Tested by

no test coverage detected