(t *testing.T)
| 1769 | } |
| 1770 | |
| 1771 | func TestListKeysFromPurgedStream(t *testing.T) { |
| 1772 | s := RunBasicJetStreamServer() |
| 1773 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1774 | |
| 1775 | nc, err := nats.Connect(s.ClientURL()) |
| 1776 | if err != nil { |
| 1777 | t.Fatalf("Error on connect: %v", err) |
| 1778 | } |
| 1779 | defer nc.Close() |
| 1780 | |
| 1781 | js, err := nc.JetStream(nats.MaxWait(100 * time.Millisecond)) |
| 1782 | if err != nil { |
| 1783 | t.Fatalf("Error getting jetstream context: %v", err) |
| 1784 | } |
| 1785 | |
| 1786 | kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "A"}) |
| 1787 | if err != nil { |
| 1788 | t.Fatalf("Error creating kv: %v", err) |
| 1789 | } |
| 1790 | |
| 1791 | for i := range 10000 { |
| 1792 | if _, err := kv.Put(fmt.Sprintf("key-%d", i), []byte("val")); err != nil { |
| 1793 | t.Fatalf("Error putting key: %v", err) |
| 1794 | } |
| 1795 | } |
| 1796 | |
| 1797 | // purge the stream after a bit |
| 1798 | go func() { |
| 1799 | time.Sleep(10 * time.Millisecond) |
| 1800 | if err := js.PurgeStream("KV_A"); err != nil { |
| 1801 | t.Logf("Error purging stream: %v", err) |
| 1802 | } |
| 1803 | }() |
| 1804 | keys, err := kv.ListKeys() |
| 1805 | if err != nil { |
| 1806 | t.Fatalf("Error listing keys: %v", err) |
| 1807 | } |
| 1808 | |
| 1809 | // there should not be a deadlock here |
| 1810 | for { |
| 1811 | select { |
| 1812 | case _, ok := <-keys.Keys(): |
| 1813 | if !ok { |
| 1814 | return |
| 1815 | } |
| 1816 | case <-time.After(5 * time.Second): |
| 1817 | t.Fatalf("Timeout waiting for keys") |
| 1818 | } |
| 1819 | } |
| 1820 | } |
| 1821 | |
| 1822 | func TestKeyValueWatcherStopTimer(t *testing.T) { |
| 1823 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected