MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamMsgSubjectRewrite

Function TestJetStreamMsgSubjectRewrite

test/js_test.go:9269–9308  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

9267}
9268
9269func TestJetStreamMsgSubjectRewrite(t *testing.T) {
9270 s := RunBasicJetStreamServer()
9271 defer shutdownJSServerAndRemoveStorage(t, s)
9272
9273 nc, js := jsClient(t, s)
9274 defer nc.Close()
9275
9276 if _, err := js.AddStream(&nats.StreamConfig{
9277 Name: "TEST",
9278 Subjects: []string{"foo"},
9279 }); err != nil {
9280 t.Fatalf("Error adding stream: %v", err)
9281 }
9282
9283 sub, err := nc.SubscribeSync(nats.NewInbox())
9284 if err != nil {
9285 t.Fatalf("Error on subscribe: %v", err)
9286 }
9287 if _, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
9288 DeliverSubject: sub.Subject,
9289 DeliverPolicy: nats.DeliverAllPolicy,
9290 }); err != nil {
9291 t.Fatalf("Error adding consumer: %v", err)
9292 }
9293
9294 if _, err := js.Publish("foo", []byte("msg")); err != nil {
9295 t.Fatalf("Error on publish: %v", err)
9296 }
9297
9298 msg, err := sub.NextMsg(time.Second)
9299 if err != nil {
9300 t.Fatalf("Did not get message: %v", err)
9301 }
9302 if msg.Subject != "foo" {
9303 t.Fatalf("Subject should be %q, got %q", "foo", msg.Subject)
9304 }
9305 if string(msg.Data) != "msg" {
9306 t.Fatalf("Unexpected data: %q", msg.Data)
9307 }
9308}
9309
9310func TestJetStreamPullSubscribeFetchContext(t *testing.T) {
9311 withJSCluster(t, "PULLCTX", 3, testJetStreamFetchContext)

Callers

nothing calls this directly

Calls 11

FatalfMethod · 0.80
NewInboxMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
SubscribeSyncMethod · 0.65
AddConsumerMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected