MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncAckTimeout

Function TestPublishAsyncAckTimeout

test/js_test.go:8261–8322  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8259}
8260
8261func TestPublishAsyncAckTimeout(t *testing.T) {
8262 s := RunBasicJetStreamServer()
8263 defer shutdownJSServerAndRemoveStorage(t, s)
8264
8265 nc, err := nats.Connect(s.ClientURL())
8266 if err != nil {
8267 t.Fatalf("Unexpected error: %v", err)
8268 }
8269
8270 errs := make(chan error, 1)
8271 js, err := nc.JetStream(
8272 nats.PublishAsyncTimeout(50*time.Millisecond),
8273 nats.PublishAsyncErrHandler(func(js nats.JetStream, m *nats.Msg, e error) {
8274 errs <- e
8275 }),
8276 )
8277 if err != nil {
8278 t.Fatalf("Unexpected error: %v", err)
8279 }
8280 defer nc.Close()
8281
8282 _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true})
8283 if err != nil {
8284 t.Fatalf("Unexpected error: %v", err)
8285 }
8286
8287 ack, err := js.PublishAsync("FOO.A", []byte("hello"))
8288 if err != nil {
8289 t.Fatalf("Unexpected error: %v", err)
8290 }
8291
8292 select {
8293 case <-ack.Ok():
8294 t.Fatalf("Expected timeout")
8295 case err := <-ack.Err():
8296 if !errors.Is(err, nats.ErrAsyncPublishTimeout) {
8297 t.Fatalf("Expected error: %v; got: %v", nats.ErrAsyncPublishTimeout, err)
8298 }
8299 case <-time.After(time.Second):
8300 t.Fatalf("Did not receive ack timeout")
8301 }
8302
8303 // check if error callback is called
8304 select {
8305 case err := <-errs:
8306 if !errors.Is(err, nats.ErrAsyncPublishTimeout) {
8307 t.Fatalf("Expected error: %v; got: %v", nats.ErrAsyncPublishTimeout, err)
8308 }
8309 case <-time.After(time.Second):
8310 t.Fatalf("Did not receive error from error handler")
8311 }
8312
8313 if js.PublishAsyncPending() != 0 {
8314 t.Fatalf("Expected no pending messages")
8315 }
8316
8317 select {
8318 case <-js.PublishAsyncComplete():

Callers

nothing calls this directly

Calls 13

ConnectMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
OkMethod · 0.65
ErrMethod · 0.65
PublishAsyncPendingMethod · 0.65
PublishAsyncCompleteMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected