(t *testing.T)
| 8259 | } |
| 8260 | |
| 8261 | func TestPublishAsyncAckTimeout(t *testing.T) { |
| 8262 | s := RunBasicJetStreamServer() |
| 8263 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8264 | |
| 8265 | nc, err := nats.Connect(s.ClientURL()) |
| 8266 | if err != nil { |
| 8267 | t.Fatalf("Unexpected error: %v", err) |
| 8268 | } |
| 8269 | |
| 8270 | errs := make(chan error, 1) |
| 8271 | js, err := nc.JetStream( |
| 8272 | nats.PublishAsyncTimeout(50*time.Millisecond), |
| 8273 | nats.PublishAsyncErrHandler(func(js nats.JetStream, m *nats.Msg, e error) { |
| 8274 | errs <- e |
| 8275 | }), |
| 8276 | ) |
| 8277 | if err != nil { |
| 8278 | t.Fatalf("Unexpected error: %v", err) |
| 8279 | } |
| 8280 | defer nc.Close() |
| 8281 | |
| 8282 | _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true}) |
| 8283 | if err != nil { |
| 8284 | t.Fatalf("Unexpected error: %v", err) |
| 8285 | } |
| 8286 | |
| 8287 | ack, err := js.PublishAsync("FOO.A", []byte("hello")) |
| 8288 | if err != nil { |
| 8289 | t.Fatalf("Unexpected error: %v", err) |
| 8290 | } |
| 8291 | |
| 8292 | select { |
| 8293 | case <-ack.Ok(): |
| 8294 | t.Fatalf("Expected timeout") |
| 8295 | case err := <-ack.Err(): |
| 8296 | if !errors.Is(err, nats.ErrAsyncPublishTimeout) { |
| 8297 | t.Fatalf("Expected error: %v; got: %v", nats.ErrAsyncPublishTimeout, err) |
| 8298 | } |
| 8299 | case <-time.After(time.Second): |
| 8300 | t.Fatalf("Did not receive ack timeout") |
| 8301 | } |
| 8302 | |
| 8303 | // check if error callback is called |
| 8304 | select { |
| 8305 | case err := <-errs: |
| 8306 | if !errors.Is(err, nats.ErrAsyncPublishTimeout) { |
| 8307 | t.Fatalf("Expected error: %v; got: %v", nats.ErrAsyncPublishTimeout, err) |
| 8308 | } |
| 8309 | case <-time.After(time.Second): |
| 8310 | t.Fatalf("Did not receive error from error handler") |
| 8311 | } |
| 8312 | |
| 8313 | if js.PublishAsyncPending() != 0 { |
| 8314 | t.Fatalf("Expected no pending messages") |
| 8315 | } |
| 8316 | |
| 8317 | select { |
| 8318 | case <-js.PublishAsyncComplete(): |
nothing calls this directly
no test coverage detected