| 1388 | } |
| 1389 | |
| 1390 | func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { |
| 1391 | if locallyGenerated && m.State() != services.Running { |
| 1392 | level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) |
| 1393 | return |
| 1394 | } |
| 1395 | data, err := codec.Encode(change) |
| 1396 | |
| 1397 | if err != nil { |
| 1398 | level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) |
| 1399 | m.numberOfBroadcastMessagesDropped.Inc() |
| 1400 | return |
| 1401 | } |
| 1402 | |
| 1403 | kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)} |
| 1404 | pairData, err := kvPair.Marshal() |
| 1405 | if err != nil { |
| 1406 | level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) |
| 1407 | m.numberOfBroadcastMessagesDropped.Inc() |
| 1408 | return |
| 1409 | } |
| 1410 | |
| 1411 | mergedChanges := change.MergeContent() |
| 1412 | m.addSentMessage(Message{ |
| 1413 | Time: time.Now(), |
| 1414 | Size: len(pairData), |
| 1415 | Pair: kvPair, |
| 1416 | Version: version, |
| 1417 | Changes: mergedChanges, |
| 1418 | }) |
| 1419 | |
| 1420 | l := len(pairData) |
| 1421 | b := ringBroadcast{ |
| 1422 | key: key, |
| 1423 | content: mergedChanges, |
| 1424 | version: version, |
| 1425 | msg: pairData, |
| 1426 | finished: func(ringBroadcast) { |
| 1427 | m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) |
| 1428 | }, |
| 1429 | logger: m.logger, |
| 1430 | } |
| 1431 | |
| 1432 | m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) |
| 1433 | |
| 1434 | if locallyGenerated { |
| 1435 | m.localBroadcasts.QueueBroadcast(b) |
| 1436 | } else { |
| 1437 | m.gossipBroadcasts.QueueBroadcast(b) |
| 1438 | } |
| 1439 | } |
| 1440 | |
| 1441 | // NodeMeta is method from Memberlist Delegate interface |
| 1442 | func (m *KV) NodeMeta(_ int) []byte { |