(t *testing.T)
| 1987 | } |
| 1988 | |
| 1989 | func TestPublishAsyncClearStall(t *testing.T) { |
| 1990 | s := RunBasicJetStreamServer() |
| 1991 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1992 | |
| 1993 | nc, err := nats.Connect(s.ClientURL()) |
| 1994 | if err != nil { |
| 1995 | t.Fatalf("Unexpected error: %v", err) |
| 1996 | } |
| 1997 | |
| 1998 | js, err := jetstream.New(nc, |
| 1999 | jetstream.WithPublishAsyncTimeout(500*time.Millisecond), |
| 2000 | jetstream.WithPublishAsyncMaxPending(100)) |
| 2001 | if err != nil { |
| 2002 | t.Fatalf("Unexpected error: %v", err) |
| 2003 | } |
| 2004 | defer nc.Close() |
| 2005 | |
| 2006 | // use stream with no acks to test stalling |
| 2007 | _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true}) |
| 2008 | if err != nil { |
| 2009 | t.Fatalf("Unexpected error: %v", err) |
| 2010 | } |
| 2011 | |
| 2012 | for range 100 { |
| 2013 | _, err := js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithStallWait(1*time.Nanosecond)) |
| 2014 | if err != nil { |
| 2015 | t.Fatalf("Unexpected error: %v", err) |
| 2016 | } |
| 2017 | } |
| 2018 | // after publishing 100 messages, next one should fail with ErrTooManyStalledMsgs |
| 2019 | _, err = js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithStallWait(50*time.Millisecond)) |
| 2020 | if !errors.Is(err, jetstream.ErrTooManyStalledMsgs) { |
| 2021 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err) |
| 2022 | } |
| 2023 | |
| 2024 | // after publish timeout all pending messages should be cleared |
| 2025 | // and we should be able to publish again |
| 2026 | select { |
| 2027 | case <-js.PublishAsyncComplete(): |
| 2028 | case <-time.After(2 * time.Second): |
| 2029 | t.Fatalf("Did not receive completion signal") |
| 2030 | } |
| 2031 | |
| 2032 | if _, err = js.PublishAsync("FOO.A", []byte("hello")); err != nil { |
| 2033 | t.Fatalf("Unexpected error: %v", err) |
| 2034 | } |
| 2035 | if js.PublishAsyncPending() != 1 { |
| 2036 | t.Fatalf("Expected 1 pending message; got: %d", js.PublishAsyncPending()) |
| 2037 | } |
| 2038 | } |
| 2039 | |
| 2040 | func TestPublishWithScheduleAt(t *testing.T) { |
| 2041 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected