(t *testing.T)
| 1718 | } |
| 1719 | |
| 1720 | func TestNotifyMsgResendsOnlyChanges(t *testing.T) { |
| 1721 | codec := dataCodec{} |
| 1722 | |
| 1723 | var cfg KVConfig |
| 1724 | flagext.DefaultValues(&cfg) |
| 1725 | cfg.TCPTransport.BindAddrs = getLocalhostAddrs() |
| 1726 | // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. |
| 1727 | cfg.RetransmitMult = 1 |
| 1728 | cfg.Codecs = append(cfg.Codecs, codec) |
| 1729 | |
| 1730 | kv := NewKV(cfg, log.NewNopLogger(), &staticDNSProviderMock{}, prometheus.NewPedanticRegistry()) |
| 1731 | require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) |
| 1732 | defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck |
| 1733 | |
| 1734 | client, err := NewClient(kv, codec) |
| 1735 | require.NoError(t, err) |
| 1736 | |
| 1737 | // No broadcast messages from KV at the beginning. |
| 1738 | require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) |
| 1739 | |
| 1740 | now := time.Now() |
| 1741 | |
| 1742 | require.NoError(t, client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { |
| 1743 | d := getOrCreateData(in) |
| 1744 | d.Members["a"] = member{Timestamp: now.Unix(), State: JOINING} |
| 1745 | d.Members["b"] = member{Timestamp: now.Unix(), State: JOINING} |
| 1746 | return d, true, nil |
| 1747 | })) |
| 1748 | |
| 1749 | // Check that new instance is broadcasted about just once. |
| 1750 | assert.Equal(t, 1, len(kv.GetBroadcasts(0, math.MaxInt32))) |
| 1751 | require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) |
| 1752 | |
| 1753 | kv.NotifyMsg(marshalKeyValuePair(t, key, codec, &data{ |
| 1754 | Members: map[string]member{ |
| 1755 | "a": {Timestamp: now.Unix() - 5, State: ACTIVE}, |
| 1756 | "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, |
| 1757 | "c": {Timestamp: now.Unix(), State: ACTIVE}, |
| 1758 | }})) |
| 1759 | |
| 1760 | // Wait until KV update has been processed. |
| 1761 | time.Sleep(time.Millisecond * 100) |
| 1762 | |
| 1763 | // Check two things here: |
| 1764 | // 1) state of value in KV store |
| 1765 | // 2) broadcast message only has changed members |
| 1766 | |
| 1767 | d := getData(t, client, key) |
| 1768 | assert.Equal(t, &data{ |
| 1769 | Members: map[string]member{ |
| 1770 | "a": {Timestamp: now.Unix(), State: JOINING, Tokens: []uint32{}}, // unchanged, timestamp too old |
| 1771 | "b": {Timestamp: now.Unix() + 5, State: ACTIVE, Tokens: []uint32{1, 2, 3}}, |
| 1772 | "c": {Timestamp: now.Unix(), State: ACTIVE, Tokens: []uint32{}}, |
| 1773 | }}, d) |
| 1774 | |
| 1775 | bs := kv.GetBroadcasts(0, math.MaxInt32) |
| 1776 | require.Equal(t, 1, len(bs)) |
| 1777 |
nothing calls this directly
no test coverage detected