(t *testing.T)
| 744 | } |
| 745 | |
| 746 | func TestPublishAsyncWithTTL(t *testing.T) { |
| 747 | srv := RunBasicJetStreamServer() |
| 748 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 749 | nc, err := nats.Connect(srv.ClientURL()) |
| 750 | if err != nil { |
| 751 | t.Fatalf("Unexpected error: %v", err) |
| 752 | } |
| 753 | |
| 754 | js, err := jetstream.New(nc) |
| 755 | if err != nil { |
| 756 | t.Fatalf("Unexpected error: %v", err) |
| 757 | } |
| 758 | defer nc.Close() |
| 759 | |
| 760 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 761 | defer cancel() |
| 762 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 763 | Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64, AllowMsgTTL: true}) |
| 764 | if err != nil { |
| 765 | t.Fatalf("Unexpected error: %v", err) |
| 766 | } |
| 767 | |
| 768 | paf, err := js.PublishAsync("FOO.1", []byte("msg"), jetstream.WithMsgTTL(1*time.Second)) |
| 769 | if err != nil { |
| 770 | t.Fatalf("Unexpected error: %v", err) |
| 771 | } |
| 772 | var ack *jetstream.PubAck |
| 773 | select { |
| 774 | case ack = <-paf.Ok(): |
| 775 | case <-time.After(5 * time.Second): |
| 776 | t.Fatalf("Did not receive ack") |
| 777 | } |
| 778 | |
| 779 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 780 | if err != nil { |
| 781 | t.Fatalf("Unexpected error: %v", err) |
| 782 | } |
| 783 | if ttl := gotMsg.Header.Get("Nats-TTL"); ttl != "1s" { |
| 784 | t.Fatalf("Expected message to have TTL header set to 1s; got: %s", ttl) |
| 785 | } |
| 786 | time.Sleep(1500 * time.Millisecond) |
| 787 | _, err = stream.GetMsg(ctx, ack.Sequence) |
| 788 | if !errors.Is(err, jetstream.ErrMsgNotFound) { |
| 789 | t.Fatalf("Expected not found error; got: %v", err) |
| 790 | } |
| 791 | } |
| 792 | |
| 793 | func TestPublish(t *testing.T) { |
| 794 | // Only very basic test cases, as most use cases are tested in TestPublishMsg |
nothing calls this directly
no test coverage detected