MCPcopy
hub / github.com/grafana/dskit / LocalState

Method LocalState

kv/memberlist/memberlist_client.go:1571–1640  ·  view source on GitHub ↗

LocalState is method from Memberlist Delegate interface This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster). Here we dump our entire state -- all keys and their values. There is no limit on message size here, as Memberlist uses 'stream' operations for transfe

(_ bool)

Source from the content-addressed store, hash-verified

1569// Here we dump our entire state -- all keys and their values. There is no limit on message size here,
1570// as Memberlist uses 'stream' operations for transferring this state.
1571func (m *KV) LocalState(_ bool) []byte {
1572 if !m.delegateReady.Load() {
1573 return nil
1574 }
1575
1576 m.numberOfPulls.Inc()
1577
1578 m.storeMu.Lock()
1579 defer m.storeMu.Unlock()
1580
1581 // For each Key/Value pair in our store, we write
1582 // [4-bytes length of marshalled KV pair] [marshalled KV pair]
1583
1584 buf := bytes.Buffer{}
1585 sent := time.Now()
1586
1587 kvPair := KeyValuePair{}
1588 for key, val := range m.store {
1589 if val.value == nil {
1590 continue
1591 }
1592
1593 codec := m.GetCodec(val.CodecID)
1594 if codec == nil {
1595 level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key)
1596 continue
1597 }
1598
1599 encoded, err := codec.Encode(val.value)
1600 if err != nil {
1601 level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err)
1602 continue
1603 }
1604
1605 kvPair.Reset()
1606 kvPair.Key = key
1607 kvPair.Value = encoded
1608 kvPair.Codec = val.CodecID
1609 kvPair.Deleted = val.Deleted
1610 kvPair.UpdateTimeMillis = updateTimeMillis(val.UpdateTime)
1611
1612 ser, err := kvPair.Marshal()
1613 if err != nil {
1614 level.Error(m.logger).Log("msg", "failed to serialize KV Pair", "err", err)
1615 continue
1616 }
1617
1618 if uint(len(ser)) > math.MaxUint32 {
1619 level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded))
1620 continue
1621 }
1622
1623 err = binary.Write(&buf, binary.BigEndian, uint32(len(ser)))
1624 if err != nil {
1625 level.Error(m.logger).Log("msg", "failed to write uint32 to buffer?", "err", err)
1626 continue
1627 }
1628 buf.Write(ser)

Calls 11

GetCodecMethod · 0.95
ResetMethod · 0.95
MarshalMethod · 0.95
addSentMessageMethod · 0.95
updateTimeMillisFunction · 0.85
EncodeMethod · 0.65
AddMethod · 0.65
LogMethod · 0.45
ErrorMethod · 0.45
WriteMethod · 0.45
LenMethod · 0.45