MCPcopy
hub / github.com/nats-io/nats.go / TestMessageDetails

Function TestMessageDetails

jetstream/test/message_test.go:27–87  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

25)
26
27func TestMessageDetails(t *testing.T) {
28 srv := RunBasicJetStreamServer()
29 defer shutdownJSServerAndRemoveStorage(t, srv)
30 nc, err := nats.Connect(srv.ClientURL())
31 if err != nil {
32 t.Fatalf("Unexpected error: %v", err)
33 }
34
35 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
36 defer cancel()
37 js, err := jetstream.New(nc)
38 if err != nil {
39 t.Fatalf("Unexpected error: %v", err)
40 }
41 defer nc.Close()
42
43 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
44 if err != nil {
45 t.Fatalf("Unexpected error: %v", err)
46 }
47 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
48 Durable: "cons",
49 AckPolicy: jetstream.AckExplicitPolicy,
50 Description: "test consumer",
51 })
52 if err != nil {
53 t.Fatalf("Unexpected error: %v", err)
54 }
55
56 if _, err := js.Publish(ctx, "FOO.1", []byte("msg"), jetstream.WithMsgID("123")); err != nil {
57 t.Fatalf("Unexpected error: %v", err)
58 }
59 msgs, err := c.Fetch(1)
60 if err != nil {
61 t.Fatalf("Unexpected error: %v", err)
62 }
63
64 msg := <-msgs.Messages()
65 if msg == nil {
66 t.Fatalf("No messages available")
67 }
68 if err := msgs.Error(); err != nil {
69 t.Fatalf("unexpected error during fetch: %v", err)
70 }
71 if string(msg.Data()) != "msg" {
72 t.Fatalf("Invalid message body; want: 'msg'; got: %q", string(msg.Data()))
73 }
74 metadata, err := msg.Metadata()
75 if err != nil {
76 t.Fatalf("Unexpected error: %v", err)
77 }
78 if metadata.Consumer != "cons" || metadata.Stream != "foo" {
79 t.Fatalf("Invalid message metadata: %v", metadata)
80 }
81 if val, ok := msg.Headers()["Nats-Msg-Id"]; !ok || val[0] != "123" {
82 t.Fatalf("Invalid message headers: %v", msg.Headers())
83 }
84 if msg.Subject() != "FOO.1" {

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
WithMsgIDFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
FetchMethod · 0.65
MessagesMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected