(workerCh <-chan valueUpdate, key string)
| 1501 | } |
| 1502 | |
| 1503 | func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { |
| 1504 | for { |
| 1505 | select { |
| 1506 | case update := <-workerCh: |
| 1507 | // we have a value update! Let's merge it with our current version for given key |
| 1508 | mod, version, deleted, updated, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) |
| 1509 | |
| 1510 | changes := []string(nil) |
| 1511 | if mod != nil { |
| 1512 | changes = mod.MergeContent() |
| 1513 | } |
| 1514 | |
| 1515 | m.addReceivedMessage(Message{ |
| 1516 | Time: time.Now(), |
| 1517 | Size: update.messageSize, |
| 1518 | Pair: KeyValuePair{ |
| 1519 | Key: key, |
| 1520 | Value: update.value, |
| 1521 | Codec: update.codec.CodecID(), |
| 1522 | Deleted: deleted, |
| 1523 | UpdateTimeMillis: updateTimeMillis(updated), |
| 1524 | }, |
| 1525 | Version: version, |
| 1526 | Changes: changes, |
| 1527 | }) |
| 1528 | |
| 1529 | if err != nil { |
| 1530 | level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err) |
| 1531 | } else if version > 0 { |
| 1532 | m.notifyWatchers(key) |
| 1533 | |
| 1534 | // Don't resend original message, but only changes, if any. |
| 1535 | m.broadcastNewValue(key, mod, version, update.codec, false, deleted, updated) |
| 1536 | } |
| 1537 | |
| 1538 | case <-m.shutdown: |
| 1539 | // stop running on shutdown |
| 1540 | return |
| 1541 | } |
| 1542 | } |
| 1543 | } |
| 1544 | |
| 1545 | // GetBroadcasts is method from Memberlist Delegate interface |
| 1546 | // It returns all pending broadcasts (within the size limit) |
no test coverage detected