(t *testing.T)
| 5677 | } |
| 5678 | |
| 5679 | func TestJetStream_Unsubscribe(t *testing.T) { |
| 5680 | s := RunBasicJetStreamServer() |
| 5681 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 5682 | |
| 5683 | nc, js := jsClient(t, s) |
| 5684 | defer nc.Close() |
| 5685 | |
| 5686 | var err error |
| 5687 | |
| 5688 | _, err = js.AddStream(&nats.StreamConfig{ |
| 5689 | Name: "foo", |
| 5690 | Subjects: []string{"foo.A", "foo.B", "foo.C"}, |
| 5691 | }) |
| 5692 | if err != nil { |
| 5693 | t.Fatalf("Unexpected error: %v", err) |
| 5694 | } |
| 5695 | |
| 5696 | fetchConsumers := func(t *testing.T, expected int) { |
| 5697 | t.Helper() |
| 5698 | checkFor(t, time.Second, 15*time.Millisecond, func() error { |
| 5699 | var infos []*nats.ConsumerInfo |
| 5700 | for info := range js.Consumers("foo") { |
| 5701 | infos = append(infos, info) |
| 5702 | } |
| 5703 | if len(infos) != expected { |
| 5704 | return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos)) |
| 5705 | } |
| 5706 | return nil |
| 5707 | }) |
| 5708 | } |
| 5709 | |
| 5710 | deleteAllConsumers := func(t *testing.T) { |
| 5711 | t.Helper() |
| 5712 | for cn := range js.ConsumerNames("foo") { |
| 5713 | js.DeleteConsumer("foo", cn) |
| 5714 | } |
| 5715 | } |
| 5716 | |
| 5717 | js.Publish("foo.A", []byte("A")) |
| 5718 | js.Publish("foo.B", []byte("B")) |
| 5719 | js.Publish("foo.C", []byte("C")) |
| 5720 | |
| 5721 | t.Run("consumers deleted on unsubscribe", func(t *testing.T) { |
| 5722 | sub, err := js.SubscribeSync("foo.A") |
| 5723 | if err != nil { |
| 5724 | t.Fatal(err) |
| 5725 | } |
| 5726 | if err := sub.Unsubscribe(); err != nil { |
| 5727 | t.Errorf("Unexpected error: %v", err) |
| 5728 | } |
| 5729 | |
| 5730 | sub, err = js.SubscribeSync("foo.B", nats.Durable("B")) |
| 5731 | if err != nil { |
| 5732 | t.Fatal(err) |
| 5733 | } |
| 5734 | if err := sub.Unsubscribe(); err != nil { |
| 5735 | t.Errorf("Unexpected error: %v", err) |
| 5736 | } |
nothing calls this directly
no test coverage detected