(t *testing.T)
| 4159 | } |
| 4160 | |
| 4161 | func TestJetStreamManagement_SecureDeleteMsg(t *testing.T) { |
| 4162 | s := RunBasicJetStreamServer() |
| 4163 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 4164 | |
| 4165 | nc, js := jsClient(t, s) |
| 4166 | defer nc.Close() |
| 4167 | |
| 4168 | var err error |
| 4169 | |
| 4170 | _, err = js.AddStream(&nats.StreamConfig{ |
| 4171 | Name: "foo", |
| 4172 | Subjects: []string{"foo.A", "foo.B", "foo.C"}, |
| 4173 | }) |
| 4174 | if err != nil { |
| 4175 | t.Fatalf("Unexpected error: %v", err) |
| 4176 | } |
| 4177 | |
| 4178 | for i := 0; i < 5; i++ { |
| 4179 | js.Publish("foo.A", []byte("A")) |
| 4180 | js.Publish("foo.B", []byte("B")) |
| 4181 | js.Publish("foo.C", []byte("C")) |
| 4182 | } |
| 4183 | |
| 4184 | si, err := js.StreamInfo("foo") |
| 4185 | if err != nil { |
| 4186 | t.Fatal(err) |
| 4187 | } |
| 4188 | var total uint64 = 15 |
| 4189 | if si.State.Msgs != total { |
| 4190 | t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs) |
| 4191 | } |
| 4192 | |
| 4193 | expected := 5 |
| 4194 | msgs := make([]*nats.Msg, 0) |
| 4195 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 4196 | defer cancel() |
| 4197 | |
| 4198 | sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) { |
| 4199 | msgs = append(msgs, msg) |
| 4200 | if len(msgs) == expected { |
| 4201 | cancel() |
| 4202 | } |
| 4203 | }) |
| 4204 | if err != nil { |
| 4205 | t.Fatal(err) |
| 4206 | } |
| 4207 | <-ctx.Done() |
| 4208 | sub.Unsubscribe() |
| 4209 | |
| 4210 | got := len(msgs) |
| 4211 | if got != expected { |
| 4212 | t.Fatalf("Expected %d, got %d", expected, got) |
| 4213 | } |
| 4214 | |
| 4215 | msg := msgs[0] |
| 4216 | meta, err := msg.Metadata() |
| 4217 | if err != nil { |
| 4218 | t.Fatal(err) |
nothing calls this directly
no test coverage detected