MCPcopy
hub / github.com/grafana/dskit / broadcastNewValue

Method broadcastNewValue

kv/memberlist/memberlist_client.go:1390–1439  ·  view source on GitHub ↗
(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time)

Source from the content-addressed store, hash-verified

1388}
1389
1390func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) {
1391 if locallyGenerated && m.State() != services.Running {
1392 level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key)
1393 return
1394 }
1395 data, err := codec.Encode(change)
1396
1397 if err != nil {
1398 level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
1399 m.numberOfBroadcastMessagesDropped.Inc()
1400 return
1401 }
1402
1403 kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)}
1404 pairData, err := kvPair.Marshal()
1405 if err != nil {
1406 level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err)
1407 m.numberOfBroadcastMessagesDropped.Inc()
1408 return
1409 }
1410
1411 mergedChanges := change.MergeContent()
1412 m.addSentMessage(Message{
1413 Time: time.Now(),
1414 Size: len(pairData),
1415 Pair: kvPair,
1416 Version: version,
1417 Changes: mergedChanges,
1418 })
1419
1420 l := len(pairData)
1421 b := ringBroadcast{
1422 key: key,
1423 content: mergedChanges,
1424 version: version,
1425 msg: pairData,
1426 finished: func(ringBroadcast) {
1427 m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
1428 },
1429 logger: m.logger,
1430 }
1431
1432 m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
1433
1434 if locallyGenerated {
1435 m.localBroadcasts.QueueBroadcast(b)
1436 } else {
1437 m.gossipBroadcasts.QueueBroadcast(b)
1438 }
1439}
1440
1441// NodeMeta is method from Memberlist Delegate interface
1442func (m *KV) NodeMeta(_ int) []byte {

Callers 5

DeleteMethod · 0.95
CASMethod · 0.95
processValueUpdateMethod · 0.95
MergeRemoteStateMethod · 0.95

Calls 10

MarshalMethod · 0.95
addSentMessageMethod · 0.95
updateTimeMillisFunction · 0.85
StateMethod · 0.65
EncodeMethod · 0.65
CodecIDMethod · 0.65
MergeContentMethod · 0.65
AddMethod · 0.65
LogMethod · 0.45
ErrorMethod · 0.45

Tested by 1