Merges incoming value with value we have in our store. Returns "a change" that can be sent to other cluster members to update their state, and new version of the value. If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. If no modific
(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time)
| 1742 | // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. |
| 1743 | // If no modification occurred, new version is 0. |
| 1744 | func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codecID string, deleted bool, updateTime time.Time) (change Mergeable, newVersion uint, newDeleted bool, newUpdated time.Time, err error) { |
| 1745 | m.storeMu.Lock() |
| 1746 | defer m.storeMu.Unlock() |
| 1747 | |
| 1748 | // Note that we do not take a deep copy of curr.value here, it is modified in-place. |
| 1749 | // This is safe because the entire function runs under the store lock; we do not return |
| 1750 | // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). |
| 1751 | curr := m.store[key] |
| 1752 | |
| 1753 | // if current entry is nil but the incoming for that key is deleted then we return no change, as we do not want to revive the entry. |
| 1754 | if curr.value == nil && deleted { |
| 1755 | return nil, 0, false, time.Time{}, err |
| 1756 | } |
| 1757 | |
| 1758 | // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. |
| 1759 | if casVersion > 0 && curr.Version != casVersion { |
| 1760 | return nil, 0, false, time.Time{}, errVersionMismatch |
| 1761 | } |
| 1762 | result, change, err := computeNewValue(incomingValue, incomingValueRequiresClone, curr.value, casVersion > 0) |
| 1763 | if err != nil { |
| 1764 | return nil, 0, false, time.Time{}, err |
| 1765 | } |
| 1766 | newUpdated = curr.UpdateTime |
| 1767 | newDeleted = curr.Deleted |
| 1768 | |
| 1769 | // If incoming value is newer, use its timestamp and deleted value |
| 1770 | if !updateTime.IsZero() && updateTime.After(newUpdated) && deleted { |
| 1771 | newUpdated = updateTime |
| 1772 | newDeleted = deleted |
| 1773 | } |
| 1774 | |
| 1775 | // No change, don't store it. |
| 1776 | if (change == nil || len(change.MergeContent()) == 0) && curr.Deleted == newDeleted { |
| 1777 | return nil, 0, curr.Deleted, curr.UpdateTime, nil |
| 1778 | } |
| 1779 | |
| 1780 | if m.cfg.LeftIngestersTimeout > 0 { |
| 1781 | limit := time.Now().Add(-m.cfg.LeftIngestersTimeout) |
| 1782 | total, removed := result.RemoveTombstones(limit) |
| 1783 | m.storeTombstones.WithLabelValues(key).Set(float64(total)) |
| 1784 | m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) |
| 1785 | |
| 1786 | // Remove tombstones from change too. If change turns out to be empty after this, |
| 1787 | // we don't need to gossip the change. However, the local value will be always be updated. |
| 1788 | // |
| 1789 | // Note that "result" and "change" may actually be the same Mergeable. That is why we |
| 1790 | // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling |
| 1791 | // RemoveTombstones twice with same limit should be noop. |
| 1792 | if change != nil { |
| 1793 | change.RemoveTombstones(limit) |
| 1794 | if len(change.MergeContent()) == 0 { |
| 1795 | return nil, 0, curr.Deleted, curr.UpdateTime, nil |
| 1796 | } |
| 1797 | } |
| 1798 | } |
| 1799 | |
| 1800 | if change == nil && curr.Deleted != newDeleted { |
| 1801 | // return result as change if the only thing that changes is the Delete state of the entry. |
no test coverage detected