MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamSubscribeContextCancel

Function TestJetStreamSubscribeContextCancel

test/js_test.go:9585–9670  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

9583}
9584
9585func TestJetStreamSubscribeContextCancel(t *testing.T) {
9586 s := RunBasicJetStreamServer()
9587 defer shutdownJSServerAndRemoveStorage(t, s)
9588
9589 nc, js := jsClient(t, s)
9590 defer nc.Close()
9591
9592 var err error
9593
9594 // Create the stream using our client API.
9595 _, err = js.AddStream(&nats.StreamConfig{
9596 Name: "TEST",
9597 Subjects: []string{"foo", "bar", "baz", "foo.*"},
9598 })
9599 if err != nil {
9600 t.Fatalf("Unexpected error: %v", err)
9601 }
9602
9603 toSend := 100
9604 for i := 0; i < toSend; i++ {
9605 js.Publish("bar", []byte("foo"))
9606 }
9607
9608 t.Run("cancel unsubscribes and deletes ephemeral", func(t *testing.T) {
9609 ctx, cancel := context.WithCancel(context.Background())
9610 defer cancel()
9611
9612 ch := make(chan *nats.Msg, 100)
9613 sub, err := js.Subscribe("bar", func(msg *nats.Msg) {
9614 ch <- msg
9615
9616 // Cancel will unsubscribe and remove the subscription
9617 // of the consumer.
9618 if len(ch) >= 50 {
9619 cancel()
9620 }
9621 }, nats.Context(ctx))
9622 if err != nil {
9623 t.Fatal(err)
9624 }
9625
9626 select {
9627 case <-ctx.Done():
9628 case <-time.After(3 * time.Second):
9629 t.Fatal("Timed out waiting for context to be canceled")
9630 }
9631
9632 // Consumer should not be present since unsubscribe already called.
9633 checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
9634 info, err := sub.ConsumerInfo()
9635 if err != nil && err == nats.ErrConsumerNotFound {
9636 return nil
9637 }
9638 return fmt.Errorf("Consumer still active, got: %v (info=%+v)", err, info)
9639 })
9640
9641 got := len(ch)
9642 expected := 50

Callers

nothing calls this directly

Calls 14

FatalfMethod · 0.80
ErrorfMethod · 0.80
UnsubscribeMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
checkForFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
SubscribeMethod · 0.65
ContextMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected