MCPcopy
hub / github.com/nats-io/nats.go / TestJetStream_UnsubscribeCloseDrain

Function TestJetStream_UnsubscribeCloseDrain

test/js_test.go:5816–5957  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

5814}
5815
5816func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
5817 s := RunBasicJetStreamServer()
5818 defer shutdownJSServerAndRemoveStorage(t, s)
5819
5820 serverURL := s.ClientURL()
5821 mc, jsm := jsClient(t, s)
5822 defer mc.Close()
5823
5824 var err error
5825
5826 _, err = jsm.AddStream(&nats.StreamConfig{
5827 Name: "foo",
5828 Subjects: []string{"foo.A", "foo.B", "foo.C"},
5829 })
5830 if err != nil {
5831 t.Fatalf("Unexpected error: %v", err)
5832 }
5833
5834 fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
5835 t.Helper()
5836 var infos []*nats.ConsumerInfo
5837 for info := range jsm.Consumers("foo") {
5838 infos = append(infos, info)
5839 }
5840 if len(infos) != expected {
5841 t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
5842 }
5843
5844 return infos
5845 }
5846
5847 t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {
5848 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5849 nc, err := nats.Connect(serverURL, nats.ClosedHandler(func(_ *nats.Conn) {
5850 cancel()
5851 }))
5852 if err != nil {
5853 t.Fatalf("Unexpected error: %v", err)
5854 }
5855
5856 js, err := nc.JetStream()
5857 if err != nil {
5858 t.Fatalf("Unexpected error: %v", err)
5859 }
5860 _, err = js.SubscribeSync("foo.C")
5861 if err != nil {
5862 t.Fatal(err)
5863 }
5864
5865 // sub.Drain() or nc.Drain() delete JS consumer, same than Unsubscribe()
5866 nc.Drain()
5867 <-ctx.Done()
5868 fetchConsumers(t, 0)
5869 })
5870
5871 jsm.Publish("foo.A", []byte("A.1"))
5872 jsm.Publish("foo.B", []byte("B.1"))
5873 jsm.Publish("foo.C", []byte("C.1"))

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
ConnectMethod · 0.80
ClosedHandlerMethod · 0.80
JetStreamMethod · 0.80
NextMsgMethod · 0.80
ErrorfMethod · 0.80
UnsubscribeMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
ConsumersMethod · 0.65

Tested by

no test coverage detected