(t *testing.T)
| 700 | } |
| 701 | |
| 702 | func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { |
| 703 | conf := createConfFile(t, []byte(` |
| 704 | listen: 127.0.0.1:-1 |
| 705 | jetstream: enabled |
| 706 | no_auth_user: pc |
| 707 | accounts: { |
| 708 | JS: { |
| 709 | jetstream: enabled |
| 710 | users: [ {user: pc, password: foo} ] |
| 711 | }, |
| 712 | } |
| 713 | `)) |
| 714 | defer os.Remove(conf) |
| 715 | |
| 716 | s, _ := RunServerWithConfig(conf) |
| 717 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 718 | |
| 719 | nc, js := jsClient(t, s) |
| 720 | defer nc.Close() |
| 721 | |
| 722 | var err error |
| 723 | |
| 724 | // Create a stream. |
| 725 | if _, err = js.AddStream(&nats.StreamConfig{Name: "STALL"}); err != nil { |
| 726 | t.Fatalf("Unexpected error: %v", err) |
| 727 | } |
| 728 | |
| 729 | _, err = js.StreamInfo("STALL") |
| 730 | if err != nil { |
| 731 | t.Fatalf("stream lookup failed: %v", err) |
| 732 | } |
| 733 | |
| 734 | msg := []byte(strings.Repeat("A", 512)) |
| 735 | toSend := 100_000 |
| 736 | for i := 0; i < toSend; i++ { |
| 737 | // Use plain NATS here for speed. |
| 738 | if _, err := js.PublishAsync("STALL", msg); err != nil { |
| 739 | t.Fatalf("Unexpected error: %v", err) |
| 740 | } |
| 741 | } |
| 742 | select { |
| 743 | case <-js.PublishAsyncComplete(): |
| 744 | case <-time.After(5 * time.Second): |
| 745 | t.Fatalf("Timeout waiting for messages") |
| 746 | } |
| 747 | nc.Flush() |
| 748 | |
| 749 | batch := 100 |
| 750 | msgs := make(chan *nats.Msg, batch-2) |
| 751 | sub, err := js.ChanSubscribe("STALL", msgs, |
| 752 | nats.Durable("dlc"), |
| 753 | nats.EnableFlowControl(), |
| 754 | nats.IdleHeartbeat(5*time.Second), |
| 755 | nats.MaxAckPending(batch-2), |
| 756 | ) |
| 757 | if err != nil { |
| 758 | t.Fatalf("Unexpected error: %v", err) |
| 759 | } |
nothing calls this directly
no test coverage detected