MCPcopy
hub / github.com/grafana/dskit / MergeRemoteState

Method MergeRemoteState

kv/memberlist/memberlist_client.go:1647–1719  ·  view source on GitHub ↗

MergeRemoteState is a method from the Memberlist Delegate interface. This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours. Data is full state of remote KV store, as generated by LocalState method (run on another node).

(data []byte, _ bool)

Source from the content-addressed store, hash-verified

1645//
1646// Data is full state of remote KV store, as generated by LocalState method (run on another node).
1647func (m *KV) MergeRemoteState(data []byte, _ bool) {
1648 if !m.delegateReady.Load() {
1649 return
1650 }
1651
1652 received := time.Now()
1653
1654 m.numberOfPushes.Inc()
1655 m.totalSizeOfPushes.Add(float64(len(data)))
1656
1657 kvPair := KeyValuePair{}
1658
1659 var err error
1660 // Data contains individual KV pairs (encoded as protobuf messages), each prefixed with 4 bytes length of KV pair:
1661 // [4-bytes length of marshalled KV pair] [marshalled KV pair] [4-bytes length] [KV pair]...
1662 for len(data) > 0 {
1663 if len(data) < 4 {
1664 err = fmt.Errorf("not enough data left for another KV Pair: %d", len(data))
1665 break
1666 }
1667
1668 kvPairLength := binary.BigEndian.Uint32(data)
1669
1670 data = data[4:]
1671
1672 if len(data) < int(kvPairLength) {
1673 err = fmt.Errorf("not enough data left for next KV Pair, expected %d, remaining %d bytes", kvPairLength, len(data))
1674 break
1675 }
1676
1677 kvPair.Reset()
1678 err = kvPair.Unmarshal(data[:kvPairLength])
1679 if err != nil {
1680 err = fmt.Errorf("failed to parse KV Pair: %v", err)
1681 break
1682 }
1683
1684 data = data[kvPairLength:]
1685
1686 codec := m.GetCodec(kvPair.GetCodec())
1687 if codec == nil {
1688 level.Error(m.logger).Log("msg", "failed to parse remote state: unknown codec for key", "codec", kvPair.GetCodec(), "key", kvPair.GetKey())
1689 continue
1690 }
1691
1692 // we have both key and value, try to merge it with our state
1693 change, newver, deleted, updated, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime(kvPair.UpdateTimeMillis))
1694
1695 changes := []string(nil)
1696 if change != nil {
1697 changes = change.MergeContent()
1698 }
1699
1700 m.addReceivedMessage(Message{
1701 Time: received,
1702 Size: int(kvPairLength),
1703 Pair: kvPair, // Makes a copy of kvPair.
1704 Version: newver,

Calls 15

ResetMethod · 0.95
UnmarshalMethod · 0.95
GetCodecMethod · 0.95
GetCodecMethod · 0.95
GetKeyMethod · 0.95
mergeBytesValueForKeyMethod · 0.95
addReceivedMessageMethod · 0.95
notifyWatchersMethod · 0.95
broadcastNewValueMethod · 0.95
updateTimeFunction · 0.85
ErrorfMethod · 0.80
AddMethod · 0.65