(t *testing.T)
| 8373 | } |
| 8374 | |
| 8375 | func TestPublishAsyncRetryInErrHandler(t *testing.T) { |
| 8376 | s := RunBasicJetStreamServer() |
| 8377 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 8378 | |
| 8379 | nc, err := nats.Connect(s.ClientURL()) |
| 8380 | if err != nil { |
| 8381 | t.Fatalf("Unexpected error: %v", err) |
| 8382 | } |
| 8383 | |
| 8384 | streamCreated := make(chan struct{}) |
| 8385 | errCB := func(js nats.JetStream, m *nats.Msg, e error) { |
| 8386 | <-streamCreated |
| 8387 | _, err := js.PublishMsgAsync(m) |
| 8388 | if err != nil { |
| 8389 | t.Fatalf("Unexpected error when republishing: %v", err) |
| 8390 | } |
| 8391 | } |
| 8392 | |
| 8393 | js, err := nc.JetStream(nats.PublishAsyncErrHandler(errCB)) |
| 8394 | if err != nil { |
| 8395 | t.Fatalf("Unexpected error: %v", err) |
| 8396 | } |
| 8397 | defer nc.Close() |
| 8398 | |
| 8399 | errs := make(chan error, 1) |
| 8400 | done := make(chan struct{}, 1) |
| 8401 | go func() { |
| 8402 | for i := 0; i < 10; i++ { |
| 8403 | if _, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil { |
| 8404 | errs <- err |
| 8405 | return |
| 8406 | } |
| 8407 | } |
| 8408 | done <- struct{}{} |
| 8409 | }() |
| 8410 | select { |
| 8411 | case <-done: |
| 8412 | case err := <-errs: |
| 8413 | t.Fatalf("Unexpected error during publish: %v", err) |
| 8414 | case <-time.After(5 * time.Second): |
| 8415 | t.Fatalf("Did not receive completion signal") |
| 8416 | } |
| 8417 | _, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 8418 | if err != nil { |
| 8419 | t.Fatalf("Unexpected error: %v", err) |
| 8420 | } |
| 8421 | |
| 8422 | close(streamCreated) |
| 8423 | select { |
| 8424 | case <-js.PublishAsyncComplete(): |
| 8425 | case <-time.After(5 * time.Second): |
| 8426 | t.Fatalf("Did not receive completion signal") |
| 8427 | } |
| 8428 | |
| 8429 | info, err := js.StreamInfo("foo") |
| 8430 | if err != nil { |
| 8431 | t.Fatalf("Unexpected error: %v", err) |
| 8432 | } |
nothing calls this directly
no test coverage detected