(t *testing.T)
| 858 | } |
| 859 | |
| 860 | func TestPublishTimeout(t *testing.T) { |
| 861 | srv := RunBasicJetStreamServer() |
| 862 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 863 | nc, err := nats.Connect(srv.ClientURL()) |
| 864 | if err != nil { |
| 865 | t.Fatalf("Unexpected error: %v", err) |
| 866 | } |
| 867 | |
| 868 | js, err := jetstream.New(nc, jetstream.WithDefaultTimeout(200*time.Millisecond)) |
| 869 | if err != nil { |
| 870 | t.Fatalf("Unexpected error: %v", err) |
| 871 | } |
| 872 | defer nc.Close() |
| 873 | |
| 874 | // create stream with no ack to force timeout |
| 875 | _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{ |
| 876 | Name: "foo", |
| 877 | Subjects: []string{"FOO.*"}, |
| 878 | NoAck: true, |
| 879 | }) |
| 880 | if err != nil { |
| 881 | t.Fatalf("Unexpected error: %v", err) |
| 882 | } |
| 883 | |
| 884 | now := time.Now() |
| 885 | _, err = js.Publish(context.Background(), "FOO.1", []byte("msg")) |
| 886 | if !errors.Is(err, context.DeadlineExceeded) { |
| 887 | t.Fatalf("Expected deadline exceeded error; got: %v", err) |
| 888 | } |
| 889 | since := time.Since(now) |
| 890 | if since < 200*time.Millisecond || since > 500*time.Millisecond { |
| 891 | t.Fatalf("Expected timeout to be around 200ms; got: %v", since) |
| 892 | } |
| 893 | } |
| 894 | |
| 895 | func TestPublishMsgAsync(t *testing.T) { |
| 896 | type publishConfig struct { |
nothing calls this directly
no test coverage detected