(t *testing.T)
| 4040 | } |
| 4041 | |
| 4042 | func TestJetStreamManagement_DeleteMsg(t *testing.T) { |
| 4043 | s := RunBasicJetStreamServer() |
| 4044 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 4045 | |
| 4046 | nc, js := jsClient(t, s) |
| 4047 | defer nc.Close() |
| 4048 | |
| 4049 | var err error |
| 4050 | |
| 4051 | _, err = js.AddStream(&nats.StreamConfig{ |
| 4052 | Name: "foo", |
| 4053 | Subjects: []string{"foo.A", "foo.B", "foo.C"}, |
| 4054 | }) |
| 4055 | if err != nil { |
| 4056 | t.Fatalf("Unexpected error: %v", err) |
| 4057 | } |
| 4058 | |
| 4059 | for i := 0; i < 5; i++ { |
| 4060 | js.Publish("foo.A", []byte("A")) |
| 4061 | js.Publish("foo.B", []byte("B")) |
| 4062 | js.Publish("foo.C", []byte("C")) |
| 4063 | } |
| 4064 | |
| 4065 | si, err := js.StreamInfo("foo") |
| 4066 | if err != nil { |
| 4067 | t.Fatal(err) |
| 4068 | } |
| 4069 | var total uint64 = 15 |
| 4070 | if si.State.Msgs != total { |
| 4071 | t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs) |
| 4072 | } |
| 4073 | |
| 4074 | expected := 5 |
| 4075 | msgs := make([]*nats.Msg, 0) |
| 4076 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 4077 | defer cancel() |
| 4078 | |
| 4079 | sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) { |
| 4080 | msgs = append(msgs, msg) |
| 4081 | if len(msgs) == expected { |
| 4082 | cancel() |
| 4083 | } |
| 4084 | }) |
| 4085 | if err != nil { |
| 4086 | t.Fatal(err) |
| 4087 | } |
| 4088 | <-ctx.Done() |
| 4089 | sub.Unsubscribe() |
| 4090 | |
| 4091 | got := len(msgs) |
| 4092 | if got != expected { |
| 4093 | t.Fatalf("Expected %d, got %d", expected, got) |
| 4094 | } |
| 4095 | |
| 4096 | msg := msgs[0] |
| 4097 | meta, err := msg.Metadata() |
| 4098 | if err != nil { |
| 4099 | t.Fatal(err) |
nothing calls this directly
no test coverage detected