MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamPublishAsync

Function TestJetStreamPublishAsync

test/js_test.go:8052–8204  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8050}
8051
8052func TestJetStreamPublishAsync(t *testing.T) {
8053 s := RunBasicJetStreamServer()
8054 defer shutdownJSServerAndRemoveStorage(t, s)
8055
8056 nc, js := jsClient(t, s)
8057 defer nc.Close()
8058
8059 // Make sure we get a proper failure when no stream is present.
8060 paf, err := js.PublishAsync("foo", []byte("Hello JS"))
8061 if err != nil {
8062 t.Fatalf("Unexpected error: %v", err)
8063 }
8064
8065 select {
8066 case <-paf.Ok():
8067 t.Fatalf("Did not expect to get PubAck")
8068 case err := <-paf.Err():
8069 if err != nats.ErrNoResponders {
8070 t.Fatalf("Expected a ErrNoResponders error, got %v", err)
8071 }
8072 // Should be able to get the message back to resend, etc.
8073 m := paf.Msg()
8074 if m == nil {
8075 t.Fatalf("Expected to be able to retrieve the message")
8076 }
8077 if m.Subject != "foo" || string(m.Data) != "Hello JS" {
8078 t.Fatalf("Wrong message: %+v", m)
8079 }
8080 case <-time.After(time.Second):
8081 t.Fatalf("Did not receive an error in time")
8082 }
8083
8084 // Now create a stream and expect a PubAck from <-OK().
8085 if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil {
8086 t.Fatalf("Unexpected error: %v", err)
8087 }
8088
8089 paf, err = js.PublishAsync("TEST", []byte("Hello JS ASYNC PUB"))
8090 if err != nil {
8091 t.Fatalf("Unexpected error: %v", err)
8092 }
8093
8094 select {
8095 case pa := <-paf.Ok():
8096 if pa.Stream != "TEST" || pa.Sequence != 1 {
8097 t.Fatalf("Bad PubAck: %+v", pa)
8098 }
8099 case err := <-paf.Err():
8100 t.Fatalf("Did not expect to get an error: %v", err)
8101 case <-time.After(time.Second):
8102 t.Fatalf("Did not receive a PubAck in time")
8103 }
8104
8105 errCh := make(chan error, 1)
8106
8107 // Make sure we can register an async err handler for these.
8108 errHandler := func(js nats.JetStream, originalMsg *nats.Msg, err error) {
8109 if originalMsg == nil {

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
JetStreamMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
PublishAsyncMethod · 0.65
OkMethod · 0.65
ErrMethod · 0.65
MsgMethod · 0.65
AddStreamMethod · 0.65
PublishAsyncPendingMethod · 0.65

Tested by

no test coverage detected