(t *testing.T)
| 2080 | } |
| 2081 | |
| 2082 | func TestKeyValueKeysDeduplicate(t *testing.T) { |
| 2083 | s := RunBasicJetStreamServer() |
| 2084 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 2085 | |
| 2086 | nc, js := jsClient(t, s) |
| 2087 | defer nc.Close() |
| 2088 | |
| 2089 | ctx := context.Background() |
| 2090 | kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST_KV", History: 5}) |
| 2091 | if err != nil { |
| 2092 | t.Fatalf("Error creating KV: %v", err) |
| 2093 | } |
| 2094 | |
| 2095 | for i := range 10 { |
| 2096 | key := fmt.Sprintf("key_%d", i) |
| 2097 | if _, err := kv.PutString(ctx, key, "initial"); err != nil { |
| 2098 | t.Fatalf("Error putting key %s: %v", key, err) |
| 2099 | } |
| 2100 | } |
| 2101 | |
| 2102 | done := make(chan bool) |
| 2103 | go func() { |
| 2104 | for { |
| 2105 | select { |
| 2106 | case <-done: |
| 2107 | return |
| 2108 | default: |
| 2109 | for i := range 5 { |
| 2110 | key := fmt.Sprintf("key_%d", i) |
| 2111 | if _, err := kv.PutString(ctx, key, "updated"); err != nil { |
| 2112 | if errors.Is(err, nats.ErrConnectionClosed) { |
| 2113 | return |
| 2114 | } |
| 2115 | t.Logf("Error updating key %s: %v", key, err) |
| 2116 | } |
| 2117 | } |
| 2118 | } |
| 2119 | } |
| 2120 | }() |
| 2121 | |
| 2122 | for range 20 { |
| 2123 | keys, err := kv.Keys(ctx) |
| 2124 | if err != nil { |
| 2125 | t.Fatalf("Error getting keys: %v", err) |
| 2126 | } |
| 2127 | |
| 2128 | seen := make(map[string]struct{}) |
| 2129 | for _, key := range keys { |
| 2130 | if _, exists := seen[key]; exists { |
| 2131 | t.Fatalf("Duplicate key found: %s", key) |
| 2132 | } |
| 2133 | seen[key] = struct{}{} |
| 2134 | } |
| 2135 | } |
| 2136 | |
| 2137 | close(done) |
| 2138 | } |
| 2139 |
nothing calls this directly
no test coverage detected