(t *testing.T)
| 517 | } |
| 518 | |
| 519 | func TestConsumerPushVsPull(t *testing.T) { |
| 520 | srv := RunBasicJetStreamServer() |
| 521 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 522 | nc, err := nats.Connect(srv.ClientURL()) |
| 523 | if err != nil { |
| 524 | t.Fatalf("Unexpected error: %v", err) |
| 525 | } |
| 526 | |
| 527 | js, err := jetstream.New(nc) |
| 528 | if err != nil { |
| 529 | t.Fatalf("Unexpected error: %v", err) |
| 530 | } |
| 531 | defer nc.Close() |
| 532 | |
| 533 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 534 | defer cancel() |
| 535 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 536 | if err != nil { |
| 537 | t.Fatalf("Unexpected error: %v", err) |
| 538 | } |
| 539 | _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Name: "pull"}) |
| 540 | if err != nil { |
| 541 | t.Fatalf("Unexpected error: %v", err) |
| 542 | } |
| 543 | |
| 544 | _, err = s.CreatePushConsumer(ctx, jetstream.ConsumerConfig{Name: "push", DeliverSubject: "foo"}) |
| 545 | if err != nil { |
| 546 | t.Fatalf("Unexpected error: %v", err) |
| 547 | } |
| 548 | |
| 549 | _, err = s.CreatePushConsumer(ctx, jetstream.ConsumerConfig{}) |
| 550 | if !errors.Is(err, jetstream.ErrNotPushConsumer) { |
| 551 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNotPushConsumer, err) |
| 552 | } |
| 553 | |
| 554 | _, err = s.Consumer(ctx, "push") |
| 555 | if !errors.Is(err, jetstream.ErrNotPullConsumer) { |
| 556 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNotPullConsumer, err) |
| 557 | } |
| 558 | |
| 559 | _, err = s.PushConsumer(ctx, "pull") |
| 560 | if !errors.Is(err, jetstream.ErrNotPushConsumer) { |
| 561 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNotPushConsumer, err) |
| 562 | } |
| 563 | } |
| 564 | |
| 565 | func TestDeleteConsumer(t *testing.T) { |
| 566 | tests := []struct { |
nothing calls this directly
no test coverage detected