LocalState is method from Memberlist Delegate interface This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster). Here we dump our entire state -- all keys and their values. There is no limit on message size here, as Memberlist uses 'stream' operations for transfe
(_ bool)
| 1569 | // Here we dump our entire state -- all keys and their values. There is no limit on message size here, |
| 1570 | // as Memberlist uses 'stream' operations for transferring this state. |
| 1571 | func (m *KV) LocalState(_ bool) []byte { |
| 1572 | if !m.delegateReady.Load() { |
| 1573 | return nil |
| 1574 | } |
| 1575 | |
| 1576 | m.numberOfPulls.Inc() |
| 1577 | |
| 1578 | m.storeMu.Lock() |
| 1579 | defer m.storeMu.Unlock() |
| 1580 | |
| 1581 | // For each Key/Value pair in our store, we write |
| 1582 | // [4-bytes length of marshalled KV pair] [marshalled KV pair] |
| 1583 | |
| 1584 | buf := bytes.Buffer{} |
| 1585 | sent := time.Now() |
| 1586 | |
| 1587 | kvPair := KeyValuePair{} |
| 1588 | for key, val := range m.store { |
| 1589 | if val.value == nil { |
| 1590 | continue |
| 1591 | } |
| 1592 | |
| 1593 | codec := m.GetCodec(val.CodecID) |
| 1594 | if codec == nil { |
| 1595 | level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key) |
| 1596 | continue |
| 1597 | } |
| 1598 | |
| 1599 | encoded, err := codec.Encode(val.value) |
| 1600 | if err != nil { |
| 1601 | level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err) |
| 1602 | continue |
| 1603 | } |
| 1604 | |
| 1605 | kvPair.Reset() |
| 1606 | kvPair.Key = key |
| 1607 | kvPair.Value = encoded |
| 1608 | kvPair.Codec = val.CodecID |
| 1609 | kvPair.Deleted = val.Deleted |
| 1610 | kvPair.UpdateTimeMillis = updateTimeMillis(val.UpdateTime) |
| 1611 | |
| 1612 | ser, err := kvPair.Marshal() |
| 1613 | if err != nil { |
| 1614 | level.Error(m.logger).Log("msg", "failed to serialize KV Pair", "err", err) |
| 1615 | continue |
| 1616 | } |
| 1617 | |
| 1618 | if uint(len(ser)) > math.MaxUint32 { |
| 1619 | level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded)) |
| 1620 | continue |
| 1621 | } |
| 1622 | |
| 1623 | err = binary.Write(&buf, binary.BigEndian, uint32(len(ser))) |
| 1624 | if err != nil { |
| 1625 | level.Error(m.logger).Log("msg", "failed to write uint32 to buffer?", "err", err) |
| 1626 | continue |
| 1627 | } |
| 1628 | buf.Write(ser) |