MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncRetryInErrHandler

Function TestPublishAsyncRetryInErrHandler

test/js_test.go:8375–8437  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

8373}
8374
8375func TestPublishAsyncRetryInErrHandler(t *testing.T) {
8376 s := RunBasicJetStreamServer()
8377 defer shutdownJSServerAndRemoveStorage(t, s)
8378
8379 nc, err := nats.Connect(s.ClientURL())
8380 if err != nil {
8381 t.Fatalf("Unexpected error: %v", err)
8382 }
8383
8384 streamCreated := make(chan struct{})
8385 errCB := func(js nats.JetStream, m *nats.Msg, e error) {
8386 <-streamCreated
8387 _, err := js.PublishMsgAsync(m)
8388 if err != nil {
8389 t.Fatalf("Unexpected error when republishing: %v", err)
8390 }
8391 }
8392
8393 js, err := nc.JetStream(nats.PublishAsyncErrHandler(errCB))
8394 if err != nil {
8395 t.Fatalf("Unexpected error: %v", err)
8396 }
8397 defer nc.Close()
8398
8399 errs := make(chan error, 1)
8400 done := make(chan struct{}, 1)
8401 go func() {
8402 for i := 0; i < 10; i++ {
8403 if _, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
8404 errs <- err
8405 return
8406 }
8407 }
8408 done <- struct{}{}
8409 }()
8410 select {
8411 case <-done:
8412 case err := <-errs:
8413 t.Fatalf("Unexpected error during publish: %v", err)
8414 case <-time.After(5 * time.Second):
8415 t.Fatalf("Did not receive completion signal")
8416 }
8417 _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
8418 if err != nil {
8419 t.Fatalf("Unexpected error: %v", err)
8420 }
8421
8422 close(streamCreated)
8423 select {
8424 case <-js.PublishAsyncComplete():
8425 case <-time.After(5 * time.Second):
8426 t.Fatalf("Did not receive completion signal")
8427 }
8428
8429 info, err := js.StreamInfo("foo")
8430 if err != nil {
8431 t.Fatalf("Unexpected error: %v", err)
8432 }

Callers

nothing calls this directly

Calls 11

ConnectMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
PublishMsgAsyncMethod · 0.65
PublishAsyncMethod · 0.65
AddStreamMethod · 0.65
PublishAsyncCompleteMethod · 0.65
StreamInfoMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected