MCPcopy
hub / github.com/grafana/dskit / mergeValueForKey

Method mergeValueForKey

kv/memberlist/memberlist_client.go:1744–1820  ·  view source on GitHub ↗

Merges incoming value with value we have in our store. Returns "a change" that can be sent to other cluster members to update their state, and new version of the value. If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. If no modific

(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time)

Source from the content-addressed store, hash-verified

1742// If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported.
1743// If no modification occurred, new version is 0.
1744func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (change Mergeable, newVersion uint, newDeleted bool, newUpdated time.Time, err error) {
1745 m.storeMu.Lock()
1746 defer m.storeMu.Unlock()
1747
1748 // Note that we do not take a deep copy of curr.value here, it is modified in-place.
1749 // This is safe because the entire function runs under the store lock; we do not return
1750 // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
1751 curr := m.store[key]
1752
1753 // if current entry is nil but the incoming for that key is deleted then we return no change, as we do not want to revive the entry.
1754 if curr.value == nil && deleted {
1755 return nil, 0, false, time.Time{}, err
1756 }
1757
1758 // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
1759 if casVersion > 0 && curr.Version != casVersion {
1760 return nil, 0, false, time.Time{}, errVersionMismatch
1761 }
1762 result, change, err := computeNewValue(incomingValue, incomingValueRequiresClone, curr.value, casVersion > 0)
1763 if err != nil {
1764 return nil, 0, false, time.Time{}, err
1765 }
1766 newUpdated = curr.UpdateTime
1767 newDeleted = curr.Deleted
1768
1769 // If incoming value is newer, use its timestamp and deleted value
1770 if !updateTime.IsZero() && updateTime.After(newUpdated) && deleted {
1771 newUpdated = updateTime
1772 newDeleted = deleted
1773 }
1774
1775 // No change, don't store it.
1776 if (change == nil || len(change.MergeContent()) == 0) && curr.Deleted == newDeleted {
1777 return nil, 0, curr.Deleted, curr.UpdateTime, nil
1778 }
1779
1780 if m.cfg.LeftIngestersTimeout > 0 {
1781 limit := time.Now().Add(-m.cfg.LeftIngestersTimeout)
1782 total, removed := result.RemoveTombstones(limit)
1783 m.storeTombstones.WithLabelValues(key).Set(float64(total))
1784 m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))
1785
1786 // Remove tombstones from change too. If change turns out to be empty after this,
1787 // we don't need to gossip the change. However, the local value will be always be updated.
1788 //
1789 // Note that "result" and "change" may actually be the same Mergeable. That is why we
1790 // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
1791 // RemoveTombstones twice with same limit should be noop.
1792 if change != nil {
1793 change.RemoveTombstones(limit)
1794 if len(change.MergeContent()) == 0 {
1795 return nil, 0, curr.Deleted, curr.UpdateTime, nil
1796 }
1797 }
1798 }
1799
1800 if change == nil && curr.Deleted != newDeleted {
1801 // return result as change if the only thing that changes is the Delete state of the entry.

Callers 3

DeleteMethod · 0.95
trySingleCasMethod · 0.95
mergeBytesValueForKeyMethod · 0.95

Calls 7

computeNewValueFunction · 0.85
AfterMethod · 0.65
MergeContentMethod · 0.65
AddMethod · 0.65
RemoveTombstonesMethod · 0.65
SetMethod · 0.65
CloneMethod · 0.65

Tested by

no test coverage detected