MergeRemoteState is a method from the Memberlist Delegate interface. This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours. Data is full state of remote KV store, as generated by LocalState method (run on another node).
(data []byte, _ bool)
| 1645 | // |
| 1646 | // Data is full state of remote KV store, as generated by LocalState method (run on another node). |
| 1647 | func (m *KV) MergeRemoteState(data []byte, _ bool) { |
| 1648 | if !m.delegateReady.Load() { |
| 1649 | return |
| 1650 | } |
| 1651 | |
| 1652 | received := time.Now() |
| 1653 | |
| 1654 | m.numberOfPushes.Inc() |
| 1655 | m.totalSizeOfPushes.Add(float64(len(data))) |
| 1656 | |
| 1657 | kvPair := KeyValuePair{} |
| 1658 | |
| 1659 | var err error |
| 1660 | // Data contains individual KV pairs (encoded as protobuf messages), each prefixed with 4 bytes length of KV pair: |
| 1661 | // [4-bytes length of marshalled KV pair] [marshalled KV pair] [4-bytes length] [KV pair]... |
| 1662 | for len(data) > 0 { |
| 1663 | if len(data) < 4 { |
| 1664 | err = fmt.Errorf("not enough data left for another KV Pair: %d", len(data)) |
| 1665 | break |
| 1666 | } |
| 1667 | |
| 1668 | kvPairLength := binary.BigEndian.Uint32(data) |
| 1669 | |
| 1670 | data = data[4:] |
| 1671 | |
| 1672 | if len(data) < int(kvPairLength) { |
| 1673 | err = fmt.Errorf("not enough data left for next KV Pair, expected %d, remaining %d bytes", kvPairLength, len(data)) |
| 1674 | break |
| 1675 | } |
| 1676 | |
| 1677 | kvPair.Reset() |
| 1678 | err = kvPair.Unmarshal(data[:kvPairLength]) |
| 1679 | if err != nil { |
| 1680 | err = fmt.Errorf("failed to parse KV Pair: %v", err) |
| 1681 | break |
| 1682 | } |
| 1683 | |
| 1684 | data = data[kvPairLength:] |
| 1685 | |
| 1686 | codec := m.GetCodec(kvPair.GetCodec()) |
| 1687 | if codec == nil { |
| 1688 | level.Error(m.logger).Log("msg", "failed to parse remote state: unknown codec for key", "codec", kvPair.GetCodec(), "key", kvPair.GetKey()) |
| 1689 | continue |
| 1690 | } |
| 1691 | |
| 1692 | // we have both key and value, try to merge it with our state |
| 1693 | change, newver, deleted, updated, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime(kvPair.UpdateTimeMillis)) |
| 1694 | |
| 1695 | changes := []string(nil) |
| 1696 | if change != nil { |
| 1697 | changes = change.MergeContent() |
| 1698 | } |
| 1699 | |
| 1700 | m.addReceivedMessage(Message{ |
| 1701 | Time: received, |
| 1702 | Size: int(kvPairLength), |
| 1703 | Pair: kvPair, // Makes a copy of kvPair. |
| 1704 | Version: newver, |