MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamManagement_DeleteMsg

Function TestJetStreamManagement_DeleteMsg

test/js_test.go:4042–4159  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

4040}
4041
4042func TestJetStreamManagement_DeleteMsg(t *testing.T) {
4043 s := RunBasicJetStreamServer()
4044 defer shutdownJSServerAndRemoveStorage(t, s)
4045
4046 nc, js := jsClient(t, s)
4047 defer nc.Close()
4048
4049 var err error
4050
4051 _, err = js.AddStream(&nats.StreamConfig{
4052 Name: "foo",
4053 Subjects: []string{"foo.A", "foo.B", "foo.C"},
4054 })
4055 if err != nil {
4056 t.Fatalf("Unexpected error: %v", err)
4057 }
4058
4059 for i := 0; i < 5; i++ {
4060 js.Publish("foo.A", []byte("A"))
4061 js.Publish("foo.B", []byte("B"))
4062 js.Publish("foo.C", []byte("C"))
4063 }
4064
4065 si, err := js.StreamInfo("foo")
4066 if err != nil {
4067 t.Fatal(err)
4068 }
4069 var total uint64 = 15
4070 if si.State.Msgs != total {
4071 t.Errorf("Expected %d msgs, got: %d", total, si.State.Msgs)
4072 }
4073
4074 expected := 5
4075 msgs := make([]*nats.Msg, 0)
4076 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
4077 defer cancel()
4078
4079 sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
4080 msgs = append(msgs, msg)
4081 if len(msgs) == expected {
4082 cancel()
4083 }
4084 })
4085 if err != nil {
4086 t.Fatal(err)
4087 }
4088 <-ctx.Done()
4089 sub.Unsubscribe()
4090
4091 got := len(msgs)
4092 if got != expected {
4093 t.Fatalf("Expected %d, got %d", expected, got)
4094 }
4095
4096 msg := msgs[0]
4097 meta, err := msg.Metadata()
4098 if err != nil {
4099 t.Fatal(err)

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
ErrorfMethod · 0.80
UnsubscribeMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
StreamInfoMethod · 0.65
SubscribeMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected