(t *testing.T)
| 9583 | } |
| 9584 | |
| 9585 | func TestJetStreamSubscribeContextCancel(t *testing.T) { |
| 9586 | s := RunBasicJetStreamServer() |
| 9587 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 9588 | |
| 9589 | nc, js := jsClient(t, s) |
| 9590 | defer nc.Close() |
| 9591 | |
| 9592 | var err error |
| 9593 | |
| 9594 | // Create the stream using our client API. |
| 9595 | _, err = js.AddStream(&nats.StreamConfig{ |
| 9596 | Name: "TEST", |
| 9597 | Subjects: []string{"foo", "bar", "baz", "foo.*"}, |
| 9598 | }) |
| 9599 | if err != nil { |
| 9600 | t.Fatalf("Unexpected error: %v", err) |
| 9601 | } |
| 9602 | |
| 9603 | toSend := 100 |
| 9604 | for i := 0; i < toSend; i++ { |
| 9605 | js.Publish("bar", []byte("foo")) |
| 9606 | } |
| 9607 | |
| 9608 | t.Run("cancel unsubscribes and deletes ephemeral", func(t *testing.T) { |
| 9609 | ctx, cancel := context.WithCancel(context.Background()) |
| 9610 | defer cancel() |
| 9611 | |
| 9612 | ch := make(chan *nats.Msg, 100) |
| 9613 | sub, err := js.Subscribe("bar", func(msg *nats.Msg) { |
| 9614 | ch <- msg |
| 9615 | |
| 9616 | // Cancel will unsubscribe and remove the subscription |
| 9617 | // of the consumer. |
| 9618 | if len(ch) >= 50 { |
| 9619 | cancel() |
| 9620 | } |
| 9621 | }, nats.Context(ctx)) |
| 9622 | if err != nil { |
| 9623 | t.Fatal(err) |
| 9624 | } |
| 9625 | |
| 9626 | select { |
| 9627 | case <-ctx.Done(): |
| 9628 | case <-time.After(3 * time.Second): |
| 9629 | t.Fatal("Timed out waiting for context to be canceled") |
| 9630 | } |
| 9631 | |
| 9632 | // Consumer should not be present since unsubscribe already called. |
| 9633 | checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { |
| 9634 | info, err := sub.ConsumerInfo() |
| 9635 | if err != nil && err == nats.ErrConsumerNotFound { |
| 9636 | return nil |
| 9637 | } |
| 9638 | return fmt.Errorf("Consumer still active, got: %v (info=%+v)", err, info) |
| 9639 | }) |
| 9640 | |
| 9641 | got := len(ch) |
| 9642 | expected := 50 |
nothing calls this directly
no test coverage detected