MCPcopy
hub / github.com/grafana/dskit / processValueUpdate

Method processValueUpdate

kv/memberlist/memberlist_client.go:1503–1543  ·  view source on GitHub ↗
(workerCh <-chan valueUpdate, key string)

Source from the content-addressed store, hash-verified

1501}
1502
1503func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
1504 for {
1505 select {
1506 case update := <-workerCh:
1507 // we have a value update! Let's merge it with our current version for given key
1508 mod, version, deleted, updated, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime)
1509
1510 changes := []string(nil)
1511 if mod != nil {
1512 changes = mod.MergeContent()
1513 }
1514
1515 m.addReceivedMessage(Message{
1516 Time: time.Now(),
1517 Size: update.messageSize,
1518 Pair: KeyValuePair{
1519 Key: key,
1520 Value: update.value,
1521 Codec: update.codec.CodecID(),
1522 Deleted: deleted,
1523 UpdateTimeMillis: updateTimeMillis(updated),
1524 },
1525 Version: version,
1526 Changes: changes,
1527 })
1528
1529 if err != nil {
1530 level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err)
1531 } else if version > 0 {
1532 m.notifyWatchers(key)
1533
1534 // Don't resend original message, but only changes, if any.
1535 m.broadcastNewValue(key, mod, version, update.codec, false, deleted, updated)
1536 }
1537
1538 case <-m.shutdown:
1539 // stop running on shutdown
1540 return
1541 }
1542 }
1543}
1544
1545// GetBroadcasts is method from Memberlist Delegate interface
1546// It returns all pending broadcasts (within the size limit)

Callers 1

getKeyWorkerChannelMethod · 0.95

Calls 9

mergeBytesValueForKeyMethod · 0.95
addReceivedMessageMethod · 0.95
notifyWatchersMethod · 0.95
broadcastNewValueMethod · 0.95
updateTimeMillisFunction · 0.85
MergeContentMethod · 0.65
CodecIDMethod · 0.65
LogMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected