MCPcopy
hub / github.com/nats-io/nats.go / TestPublishWithTTL

Function TestPublishWithTTL

jetstream/test/publish_test.go:667–705  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

665}
666
667func TestPublishWithTTL(t *testing.T) {
668 srv := RunBasicJetStreamServer()
669 defer shutdownJSServerAndRemoveStorage(t, srv)
670 nc, err := nats.Connect(srv.ClientURL())
671 if err != nil {
672 t.Fatalf("Unexpected error: %v", err)
673 }
674
675 js, err := jetstream.New(nc)
676 if err != nil {
677 t.Fatalf("Unexpected error: %v", err)
678 }
679 defer nc.Close()
680
681 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
682 defer cancel()
683 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
684 Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64, AllowMsgTTL: true})
685 if err != nil {
686 t.Fatalf("Unexpected error: %v", err)
687 }
688
689 ack, err := js.Publish(ctx, "FOO.1", []byte("msg"), jetstream.WithMsgTTL(1*time.Second))
690 if err != nil {
691 t.Fatalf("Unexpected error: %v", err)
692 }
693 gotMsg, err := stream.GetMsg(ctx, ack.Sequence)
694 if err != nil {
695 t.Fatalf("Unexpected error: %v", err)
696 }
697 if ttl := gotMsg.Header.Get("Nats-TTL"); ttl != "1s" {
698 t.Fatalf("Expected message to have TTL header set to 1s; got: %s", ttl)
699 }
700 time.Sleep(1500 * time.Millisecond)
701 _, err = stream.GetMsg(ctx, ack.Sequence)
702 if !errors.Is(err, jetstream.ErrMsgNotFound) {
703 t.Fatalf("Expected not found error; got: %v", err)
704 }
705}
706
707func TestMsgDeleteMarkerMaxAge(t *testing.T) {
708 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 12

NewFunction · 0.92
WithMsgTTLFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
GetMsgMethod · 0.65
GetMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected