(t *testing.T)
| 2532 | } |
| 2533 | |
| 2534 | func TestStreamSourceWithConsumer(t *testing.T) { |
| 2535 | srv := RunBasicJetStreamServer() |
| 2536 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2537 | |
| 2538 | nc, err := nats.Connect(srv.ClientURL()) |
| 2539 | if err != nil { |
| 2540 | t.Fatalf("Unexpected error: %v", err) |
| 2541 | } |
| 2542 | defer nc.Close() |
| 2543 | |
| 2544 | js, err := jetstream.New(nc) |
| 2545 | if err != nil { |
| 2546 | t.Fatalf("Unexpected error: %v", err) |
| 2547 | } |
| 2548 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 2549 | defer cancel() |
| 2550 | |
| 2551 | upstream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2552 | Name: "UP", |
| 2553 | Subjects: []string{"up"}, |
| 2554 | }) |
| 2555 | if err != nil { |
| 2556 | t.Fatalf("CreateStream upstream: %v", err) |
| 2557 | } |
| 2558 | |
| 2559 | if _, err := upstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 2560 | Durable: "C", |
| 2561 | DeliverSubject: "deliver", |
| 2562 | AckPolicy: jetstream.AckFlowControlPolicy, |
| 2563 | IdleHeartbeat: time.Second, |
| 2564 | }); err != nil { |
| 2565 | t.Fatalf("CreateOrUpdateConsumer: %v", err) |
| 2566 | } |
| 2567 | |
| 2568 | down, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2569 | Name: "DOWN", |
| 2570 | Sources: []*jetstream.StreamSource{{ |
| 2571 | Name: "UP", |
| 2572 | Consumer: &jetstream.StreamConsumerSource{ |
| 2573 | Name: "C", |
| 2574 | DeliverSubject: "deliver", |
| 2575 | }, |
| 2576 | }}, |
| 2577 | }) |
| 2578 | if err != nil { |
| 2579 | t.Fatalf("CreateStream down: %v", err) |
| 2580 | } |
| 2581 | |
| 2582 | if _, err := js.Publish(ctx, "up", []byte("hello")); err != nil { |
| 2583 | t.Fatalf("Publish: %v", err) |
| 2584 | } |
| 2585 | |
| 2586 | checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { |
| 2587 | info, err := down.Info(ctx) |
| 2588 | if err != nil { |
| 2589 | return err |
| 2590 | } |
| 2591 | if info.State.Msgs != 1 { |
nothing calls this directly
no test coverage detected