MCPcopy
hub / github.com/nats-io/nats.go / TestKeyValueKeysDeduplicate

Function TestKeyValueKeysDeduplicate

jetstream/test/kv_test.go:2082–2138  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2080}
2081
2082func TestKeyValueKeysDeduplicate(t *testing.T) {
2083 s := RunBasicJetStreamServer()
2084 defer shutdownJSServerAndRemoveStorage(t, s)
2085
2086 nc, js := jsClient(t, s)
2087 defer nc.Close()
2088
2089 ctx := context.Background()
2090 kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST_KV", History: 5})
2091 if err != nil {
2092 t.Fatalf("Error creating KV: %v", err)
2093 }
2094
2095 for i := range 10 {
2096 key := fmt.Sprintf("key_%d", i)
2097 if _, err := kv.PutString(ctx, key, "initial"); err != nil {
2098 t.Fatalf("Error putting key %s: %v", key, err)
2099 }
2100 }
2101
2102 done := make(chan bool)
2103 go func() {
2104 for {
2105 select {
2106 case <-done:
2107 return
2108 default:
2109 for i := range 5 {
2110 key := fmt.Sprintf("key_%d", i)
2111 if _, err := kv.PutString(ctx, key, "updated"); err != nil {
2112 if errors.Is(err, nats.ErrConnectionClosed) {
2113 return
2114 }
2115 t.Logf("Error updating key %s: %v", key, err)
2116 }
2117 }
2118 }
2119 }
2120 }()
2121
2122 for range 20 {
2123 keys, err := kv.Keys(ctx)
2124 if err != nil {
2125 t.Fatalf("Error getting keys: %v", err)
2126 }
2127
2128 seen := make(map[string]struct{})
2129 for _, key := range keys {
2130 if _, exists := seen[key]; exists {
2131 t.Fatalf("Duplicate key found: %s", key)
2132 }
2133 seen[key] = struct{}{}
2134 }
2135 }
2136
2137 close(done)
2138}
2139

Callers

nothing calls this directly

Calls 9

FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
CreateKeyValueMethod · 0.65
PutStringMethod · 0.65
KeysMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected