MCPcopy
hub / github.com/grafana/dskit / TestNotifyMsgResendsOnlyChanges

Function TestNotifyMsgResendsOnlyChanges

kv/memberlist/memberlist_client_test.go:1720–1785  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1718}
1719
1720func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
1721 codec := dataCodec{}
1722
1723 var cfg KVConfig
1724 flagext.DefaultValues(&cfg)
1725 cfg.TCPTransport.BindAddrs = getLocalhostAddrs()
1726 // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
1727 cfg.RetransmitMult = 1
1728 cfg.Codecs = append(cfg.Codecs, codec)
1729
1730 kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry())
1731 require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
1732 defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck
1733
1734 client, err := NewClient(kv, codec)
1735 require.NoError(t, err)
1736
1737 // No broadcast messages from KV at the beginning.
1738 require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))
1739
1740 now := time.Now()
1741
1742 require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) {
1743 d := getOrCreateData(in)
1744 d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING}
1745 d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING}
1746 return d, true, nil
1747 }))
1748
1749 // Check that new instance is broadcasted about just once.
1750 assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32)))
1751 require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))
1752
1753 kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{
1754 Members: map[string]member{
1755 "a": {Timestamp: now.Unix() - 5, State: ACTIVE},
1756 "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
1757 "c": {Timestamp: now.Unix(), State: ACTIVE},
1758 }}))
1759
1760 // Wait until KV update has been processed.
1761 time.Sleep(time.Millisecond * 100)
1762
1763 // Check two things here:
1764 // 1) state of value in KV store
1765 // 2) broadcast message only has changed members
1766
1767 d := getData(t, client, key)
1768 assert.Equal(t, &data{
1769 Members: map[string]member{
1770 "a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old
1771 "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}},
1772 "c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}},
1773 }}, d)
1774
1775 bs := kv.GetBroadcasts(0, math.MaxInt32)
1776 require.Equal(t, 1, len(bs))
1777

Callers

nothing calls this directly

Calls 15

GetBroadcastsMethod · 0.95
CASMethod · 0.95
NotifyMsgMethod · 0.95
DefaultValuesFunction · 0.92
StartAndAwaitRunningFunction · 0.92
StopAndAwaitTerminatedFunction · 0.92
getLocalhostAddrsFunction · 0.85
NewKVFunction · 0.85
getOrCreateDataFunction · 0.85
marshalKeyValuePairFunction · 0.85
getDataFunction · 0.85

Tested by

no test coverage detected