MCPcopy
hub / github.com/nats-io/nats.go / TestListKeysFromPurgedStream

Function TestListKeysFromPurgedStream

test/kv_test.go:1771–1820  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1769}
1770
1771func TestListKeysFromPurgedStream(t *testing.T) {
1772 s := RunBasicJetStreamServer()
1773 defer shutdownJSServerAndRemoveStorage(t, s)
1774
1775 nc, err := nats.Connect(s.ClientURL())
1776 if err != nil {
1777 t.Fatalf("Error on connect: %v", err)
1778 }
1779 defer nc.Close()
1780
1781 js, err := nc.JetStream(nats.MaxWait(100 * time.Millisecond))
1782 if err != nil {
1783 t.Fatalf("Error getting jetstream context: %v", err)
1784 }
1785
1786 kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "A"})
1787 if err != nil {
1788 t.Fatalf("Error creating kv: %v", err)
1789 }
1790
1791 for i := range 10000 {
1792 if _, err := kv.Put(fmt.Sprintf("key-%d", i), []byte("val")); err != nil {
1793 t.Fatalf("Error putting key: %v", err)
1794 }
1795 }
1796
1797 // purge the stream after a bit
1798 go func() {
1799 time.Sleep(10 * time.Millisecond)
1800 if err := js.PurgeStream("KV_A"); err != nil {
1801 t.Logf("Error purging stream: %v", err)
1802 }
1803 }()
1804 keys, err := kv.ListKeys()
1805 if err != nil {
1806 t.Fatalf("Error listing keys: %v", err)
1807 }
1808
1809 // there should not be a deadlock here
1810 for {
1811 select {
1812 case _, ok := <-keys.Keys():
1813 if !ok {
1814 return
1815 }
1816 case <-time.After(5 * time.Second):
1817 t.Fatalf("Timeout waiting for keys")
1818 }
1819 }
1820}
1821
1822func TestKeyValueWatcherStopTimer(t *testing.T) {
1823 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

ConnectMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateKeyValueMethod · 0.65
PutMethod · 0.65
PurgeStreamMethod · 0.65
ListKeysMethod · 0.65
KeysMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected