MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncClearStall

Function TestPublishAsyncClearStall

jetstream/test/publish_test.go:1989–2038  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1987}
1988
1989func TestPublishAsyncClearStall(t *testing.T) {
1990 s := RunBasicJetStreamServer()
1991 defer shutdownJSServerAndRemoveStorage(t, s)
1992
1993 nc, err := nats.Connect(s.ClientURL())
1994 if err != nil {
1995 t.Fatalf("Unexpected error: %v", err)
1996 }
1997
1998 js, err := jetstream.New(nc,
1999 jetstream.WithPublishAsyncTimeout(500*time.Millisecond),
2000 jetstream.WithPublishAsyncMaxPending(100))
2001 if err != nil {
2002 t.Fatalf("Unexpected error: %v", err)
2003 }
2004 defer nc.Close()
2005
2006 // use stream with no acks to test stalling
2007 _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true})
2008 if err != nil {
2009 t.Fatalf("Unexpected error: %v", err)
2010 }
2011
2012 for range 100 {
2013 _, err := js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithStallWait(1*time.Nanosecond))
2014 if err != nil {
2015 t.Fatalf("Unexpected error: %v", err)
2016 }
2017 }
2018 // after publishing 100 messages, next one should fail with ErrTooManyStalledMsgs
2019 _, err = js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithStallWait(50*time.Millisecond))
2020 if !errors.Is(err, jetstream.ErrTooManyStalledMsgs) {
2021 t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
2022 }
2023
2024 // after publish timeout all pending messages should be cleared
2025 // and we should be able to publish again
2026 select {
2027 case <-js.PublishAsyncComplete():
2028 case <-time.After(2 * time.Second):
2029 t.Fatalf("Did not receive completion signal")
2030 }
2031
2032 if _, err = js.PublishAsync("FOO.A", []byte("hello")); err != nil {
2033 t.Fatalf("Unexpected error: %v", err)
2034 }
2035 if js.PublishAsyncPending() != 1 {
2036 t.Fatalf("Expected 1 pending message; got: %d", js.PublishAsyncPending())
2037 }
2038}
2039
2040func TestPublishWithScheduleAt(t *testing.T) {
2041 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 14

NewFunction · 0.92
WithPublishAsyncTimeoutFunction · 0.92
WithStallWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
PublishAsyncPendingMethod · 0.65

Tested by

no test coverage detected