(t *testing.T)
| 8050 | } |
| 8051 | |
| 8052 | func TestJetStreamPublishAsync(t *testing.T) { |
| 8053 | s := RunBasicJetStreamServer() |
| 8054 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8055 | |
| 8056 | nc, js := jsClient(t, s) |
| 8057 | defer nc.Close() |
| 8058 | |
| 8059 | // Make sure we get a proper failure when no stream is present. |
| 8060 | paf, err := js.PublishAsync("foo", []byte("Hello JS")) |
| 8061 | if err != nil { |
| 8062 | t.Fatalf("Unexpected error: %v", err) |
| 8063 | } |
| 8064 | |
| 8065 | select { |
| 8066 | case <-paf.Ok(): |
| 8067 | t.Fatalf("Did not expect to get PubAck") |
| 8068 | case err := <-paf.Err(): |
| 8069 | if err != nats.ErrNoResponders { |
| 8070 | t.Fatalf("Expected a ErrNoResponders error, got %v", err) |
| 8071 | } |
| 8072 | // Should be able to get the message back to resend, etc. |
| 8073 | m := paf.Msg() |
| 8074 | if m == nil { |
| 8075 | t.Fatalf("Expected to be able to retrieve the message") |
| 8076 | } |
| 8077 | if m.Subject != "foo" || string(m.Data) != "Hello JS" { |
| 8078 | t.Fatalf("Wrong message: %+v", m) |
| 8079 | } |
| 8080 | case <-time.After(time.Second): |
| 8081 | t.Fatalf("Did not receive an error in time") |
| 8082 | } |
| 8083 | |
| 8084 | // Now create a stream and expect a PubAck from <-OK(). |
| 8085 | if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST"}); err != nil { |
| 8086 | t.Fatalf("Unexpected error: %v", err) |
| 8087 | } |
| 8088 | |
| 8089 | paf, err = js.PublishAsync("TEST", []byte("Hello JS ASYNC PUB")) |
| 8090 | if err != nil { |
| 8091 | t.Fatalf("Unexpected error: %v", err) |
| 8092 | } |
| 8093 | |
| 8094 | select { |
| 8095 | case pa := <-paf.Ok(): |
| 8096 | if pa.Stream != "TEST" || pa.Sequence != 1 { |
| 8097 | t.Fatalf("Bad PubAck: %+v", pa) |
| 8098 | } |
| 8099 | case err := <-paf.Err(): |
| 8100 | t.Fatalf("Did not expect to get an error: %v", err) |
| 8101 | case <-time.After(time.Second): |
| 8102 | t.Fatalf("Did not receive a PubAck in time") |
| 8103 | } |
| 8104 | |
| 8105 | errCh := make(chan error, 1) |
| 8106 | |
| 8107 | // Make sure we can register an async err handler for these. |
| 8108 | errHandler := func(js nats.JetStream, originalMsg *nats.Msg, err error) { |
| 8109 | if originalMsg == nil { |
nothing calls this directly
no test coverage detected