MCPcopy
hub / github.com/nats-io/nats.go / TestStreamSourceWithConsumer

Function TestStreamSourceWithConsumer

jetstream/test/jetstream_test.go:2534–2596  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2532}
2533
2534func TestStreamSourceWithConsumer(t *testing.T) {
2535 srv := RunBasicJetStreamServer()
2536 defer shutdownJSServerAndRemoveStorage(t, srv)
2537
2538 nc, err := nats.Connect(srv.ClientURL())
2539 if err != nil {
2540 t.Fatalf("Unexpected error: %v", err)
2541 }
2542 defer nc.Close()
2543
2544 js, err := jetstream.New(nc)
2545 if err != nil {
2546 t.Fatalf("Unexpected error: %v", err)
2547 }
2548 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
2549 defer cancel()
2550
2551 upstream, err := js.CreateStream(ctx, jetstream.StreamConfig{
2552 Name: "UP",
2553 Subjects: []string{"up"},
2554 })
2555 if err != nil {
2556 t.Fatalf("CreateStream upstream: %v", err)
2557 }
2558
2559 if _, err := upstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
2560 Durable: "C",
2561 DeliverSubject: "deliver",
2562 AckPolicy: jetstream.AckFlowControlPolicy,
2563 IdleHeartbeat: time.Second,
2564 }); err != nil {
2565 t.Fatalf("CreateOrUpdateConsumer: %v", err)
2566 }
2567
2568 down, err := js.CreateStream(ctx, jetstream.StreamConfig{
2569 Name: "DOWN",
2570 Sources: []*jetstream.StreamSource{{
2571 Name: "UP",
2572 Consumer: &jetstream.StreamConsumerSource{
2573 Name: "C",
2574 DeliverSubject: "deliver",
2575 },
2576 }},
2577 })
2578 if err != nil {
2579 t.Fatalf("CreateStream down: %v", err)
2580 }
2581
2582 if _, err := js.Publish(ctx, "up", []byte("hello")); err != nil {
2583 t.Fatalf("Publish: %v", err)
2584 }
2585
2586 checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
2587 info, err := down.Info(ctx)
2588 if err != nil {
2589 return err
2590 }
2591 if info.State.Msgs != 1 {

Callers

nothing calls this directly

Calls 12

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
checkForFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
InfoMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected