returns change, error (or nil, if CAS succeeded), and whether to retry or not. returns errNoChangeDetected if merge failed to detect change in f's output.
(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error))
| 1345 | // returns change, error (or nil, if CAS succeeded), and whether to retry or not. |
| 1346 | // returns errNoChangeDetected if merge failed to detect change in f's output. |
| 1347 | func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, bool, time.Time, error) { |
| 1348 | val, ver, err := m.get(key, codec) |
| 1349 | if err != nil { |
| 1350 | return nil, 0, false, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) |
| 1351 | } |
| 1352 | |
| 1353 | out, retry, err := f(val) |
| 1354 | if err != nil { |
| 1355 | return nil, 0, retry, false, time.Time{}, fmt.Errorf("fn returned error: %v", err) |
| 1356 | } |
| 1357 | |
| 1358 | if out == nil { |
| 1359 | // no change to be done |
| 1360 | return nil, 0, false, false, time.Time{}, nil |
| 1361 | } |
| 1362 | |
| 1363 | // Don't even try |
| 1364 | incomingValue, ok := out.(Mergeable) |
| 1365 | if !ok || incomingValue == nil { |
| 1366 | return nil, 0, retry, false, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) |
| 1367 | } |
| 1368 | |
| 1369 | // To support detection of removed items from value, we will only allow CAS operation to |
| 1370 | // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. |
| 1371 | // Supplied function may have kept a reference to the returned "incoming value". |
| 1372 | // If KV store will keep this value as well, it needs to make a clone. |
| 1373 | change, newver, deleted, updated, err := m.mergeValueForKey(key, incomingValue, true, ver, codec.CodecID(), false, time.Now()) |
| 1374 | if err == errVersionMismatch { |
| 1375 | return nil, 0, retry, false, time.Time{}, err |
| 1376 | } |
| 1377 | |
| 1378 | if err != nil { |
| 1379 | return nil, 0, retry, false, time.Time{}, fmt.Errorf("merge failed: %v", err) |
| 1380 | } |
| 1381 | |
| 1382 | if newver == 0 { |
| 1383 | // CAS method reacts on this error |
| 1384 | return nil, 0, retry, deleted, updated, errNoChangeDetected |
| 1385 | } |
| 1386 | |
| 1387 | return change, newver, retry, deleted, updated, nil |
| 1388 | } |
| 1389 | |
| 1390 | func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { |
| 1391 | if locallyGenerated && m.State() != services.Running { |
no test coverage detected