(t *testing.T)
| 5814 | } |
| 5815 | |
| 5816 | func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { |
| 5817 | s := RunBasicJetStreamServer() |
| 5818 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 5819 | |
| 5820 | serverURL := s.ClientURL() |
| 5821 | mc, jsm := jsClient(t, s) |
| 5822 | defer mc.Close() |
| 5823 | |
| 5824 | var err error |
| 5825 | |
| 5826 | _, err = jsm.AddStream(&nats.StreamConfig{ |
| 5827 | Name: "foo", |
| 5828 | Subjects: []string{"foo.A", "foo.B", "foo.C"}, |
| 5829 | }) |
| 5830 | if err != nil { |
| 5831 | t.Fatalf("Unexpected error: %v", err) |
| 5832 | } |
| 5833 | |
| 5834 | fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { |
| 5835 | t.Helper() |
| 5836 | var infos []*nats.ConsumerInfo |
| 5837 | for info := range jsm.Consumers("foo") { |
| 5838 | infos = append(infos, info) |
| 5839 | } |
| 5840 | if len(infos) != expected { |
| 5841 | t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) |
| 5842 | } |
| 5843 | |
| 5844 | return infos |
| 5845 | } |
| 5846 | |
| 5847 | t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) { |
| 5848 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 5849 | nc, err := nats.Connect(serverURL, nats.ClosedHandler(func(_ *nats.Conn) { |
| 5850 | cancel() |
| 5851 | })) |
| 5852 | if err != nil { |
| 5853 | t.Fatalf("Unexpected error: %v", err) |
| 5854 | } |
| 5855 | |
| 5856 | js, err := nc.JetStream() |
| 5857 | if err != nil { |
| 5858 | t.Fatalf("Unexpected error: %v", err) |
| 5859 | } |
| 5860 | _, err = js.SubscribeSync("foo.C") |
| 5861 | if err != nil { |
| 5862 | t.Fatal(err) |
| 5863 | } |
| 5864 | |
| 5865 | // sub.Drain() or nc.Drain() delete JS consumer, same than Unsubscribe() |
| 5866 | nc.Drain() |
| 5867 | <-ctx.Done() |
| 5868 | fetchConsumers(t, 0) |
| 5869 | }) |
| 5870 | |
| 5871 | jsm.Publish("foo.A", []byte("A.1")) |
| 5872 | jsm.Publish("foo.B", []byte("B.1")) |
| 5873 | jsm.Publish("foo.C", []byte("C.1")) |
nothing calls this directly
no test coverage detected