(t *testing.T)
| 665 | } |
| 666 | |
| 667 | func TestPublishWithTTL(t *testing.T) { |
| 668 | srv := RunBasicJetStreamServer() |
| 669 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 670 | nc, err := nats.Connect(srv.ClientURL()) |
| 671 | if err != nil { |
| 672 | t.Fatalf("Unexpected error: %v", err) |
| 673 | } |
| 674 | |
| 675 | js, err := jetstream.New(nc) |
| 676 | if err != nil { |
| 677 | t.Fatalf("Unexpected error: %v", err) |
| 678 | } |
| 679 | defer nc.Close() |
| 680 | |
| 681 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 682 | defer cancel() |
| 683 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 684 | Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64, AllowMsgTTL: true}) |
| 685 | if err != nil { |
| 686 | t.Fatalf("Unexpected error: %v", err) |
| 687 | } |
| 688 | |
| 689 | ack, err := js.Publish(ctx, "FOO.1", []byte("msg"), jetstream.WithMsgTTL(1*time.Second)) |
| 690 | if err != nil { |
| 691 | t.Fatalf("Unexpected error: %v", err) |
| 692 | } |
| 693 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 694 | if err != nil { |
| 695 | t.Fatalf("Unexpected error: %v", err) |
| 696 | } |
| 697 | if ttl := gotMsg.Header.Get("Nats-TTL"); ttl != "1s" { |
| 698 | t.Fatalf("Expected message to have TTL header set to 1s; got: %s", ttl) |
| 699 | } |
| 700 | time.Sleep(1500 * time.Millisecond) |
| 701 | _, err = stream.GetMsg(ctx, ack.Sequence) |
| 702 | if !errors.Is(err, jetstream.ErrMsgNotFound) { |
| 703 | t.Fatalf("Expected not found error; got: %v", err) |
| 704 | } |
| 705 | } |
| 706 | |
| 707 | func TestMsgDeleteMarkerMaxAge(t *testing.T) { |
| 708 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected