MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamSubscribe

Function TestJetStreamSubscribe

test/js_test.go:529–1073  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

527}
528
529func TestJetStreamSubscribe(t *testing.T) {
530 s := RunBasicJetStreamServer()
531 defer shutdownJSServerAndRemoveStorage(t, s)
532
533 nc, js := jsClient(t, s)
534 defer nc.Close()
535
536 var err error
537
538 expectConsumers := func(t *testing.T, expected int) {
539 t.Helper()
540 checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
541 var infos []*nats.ConsumerInfo
542 for info := range js.Consumers("TEST") {
543 infos = append(infos, info)
544 }
545 if len(infos) != expected {
546 return fmt.Errorf("Expected %d consumers, got: %d", expected, len(infos))
547 }
548 return nil
549 })
550 }
551
552 // Create the stream using our client API.
553 _, err = js.AddStream(&nats.StreamConfig{
554 Name: "TEST",
555 Subjects: []string{"foo", "bar", "baz", "foo.*"},
556 })
557 if err != nil {
558 t.Fatalf("Unexpected error: %v", err)
559 }
560
561 // Lookup the stream for testing.
562 _, err = js.StreamInfo("TEST")
563 if err != nil {
564 t.Fatalf("stream lookup failed: %v", err)
565 }
566
567 // If stream name is not specified, then the subject is required.
568 if _, err := js.SubscribeSync(""); err == nil || !strings.Contains(err.Error(), "required") {
569 t.Fatalf("Unexpected error: %v", err)
570 }
571 // Check that if stream name is present, then technically the subject does not have to.
572 sub, err := js.SubscribeSync("", nats.BindStream("TEST"))
573 if err != nil {
574 t.Fatalf("Unexpected error: %v", err)
575 }
576 initialPending, err := sub.InitialConsumerPending()
577 if err != nil {
578 t.Fatalf("Unexpected error: %v", err)
579 }
580 if initialPending != 0 {
581 t.Fatalf("Expected no initial pending, got %d", initialPending)
582 }
583 sub.Unsubscribe()
584
585 // Check that Queue subscribe with HB or FC fails.
586 _, err = js.QueueSubscribeSync("foo", "wq", nats.IdleHeartbeat(time.Second))

Callers

nothing calls this directly

Calls 15

ErrorfMethod · 0.80
FatalfMethod · 0.80
UnsubscribeMethod · 0.80
NextMsgMethod · 0.80
PendingMethod · 0.80
AckSyncMethod · 0.80
NextMsgWithContextMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
checkForFunction · 0.70

Tested by

no test coverage detected