MCPcopy
hub / github.com/nats-io/nats.go / TestConsumer

Function TestConsumer

jetstream/test/stream_test.go:429–517  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

427}
428
429func TestConsumer(t *testing.T) {
430 tests := []struct {
431 name string
432 durable string
433 withError error
434 }{
435 {
436 name: "get existing consumer",
437 durable: "dur",
438 },
439 {
440 name: "consumer does not exist",
441 durable: "abc",
442 withError: jetstream.ErrConsumerNotFound,
443 },
444 {
445 name: "invalid durable name",
446 durable: "dur.123",
447 withError: jetstream.ErrInvalidConsumerName,
448 },
449 {
450 name: "empty durable name",
451 durable: "",
452 withError: jetstream.ErrInvalidConsumerName,
453 },
454 {
455 name: "empty consumer name",
456 durable: "",
457 withError: jetstream.ErrInvalidConsumerName,
458 },
459 }
460
461 srv := RunBasicJetStreamServer()
462 defer shutdownJSServerAndRemoveStorage(t, srv)
463 nc, err := nats.Connect(srv.ClientURL())
464 if err != nil {
465 t.Fatalf("Unexpected error: %v", err)
466 }
467
468 js, err := jetstream.New(nc)
469 if err != nil {
470 t.Fatalf("Unexpected error: %v", err)
471 }
472 defer nc.Close()
473
474 for _, consType := range []string{"pull", "push"} {
475 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
476 defer cancel()
477 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
478 if err != nil {
479 t.Fatalf("Unexpected error: %v", err)
480 }
481 if consType == "pull" {
482 _, err = s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
483 } else {
484 _, err = s.CreateOrUpdatePushConsumer(ctx, jetstream.ConsumerConfig{Durable: "dur", DeliverSubject: "inbox", AckPolicy: jetstream.AckAllPolicy, Description: "desc"})
485 }
486 if err != nil {

Callers

nothing calls this directly

Calls 14

CachedInfoMethod · 0.95
NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
ConsumerMethod · 0.65
PushConsumerMethod · 0.65
DeleteStreamMethod · 0.65

Tested by

no test coverage detected