(t *testing.T)
| 2466 | } |
| 2467 | |
| 2468 | func TestScheduleAtDelivery(t *testing.T) { |
| 2469 | srv := RunBasicJetStreamServer() |
| 2470 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2471 | nc, err := nats.Connect(srv.ClientURL()) |
| 2472 | if err != nil { |
| 2473 | t.Fatalf("Unexpected error: %v", err) |
| 2474 | } |
| 2475 | defer nc.Close() |
| 2476 | |
| 2477 | js, err := jetstream.New(nc) |
| 2478 | if err != nil { |
| 2479 | t.Fatalf("Unexpected error: %v", err) |
| 2480 | } |
| 2481 | |
| 2482 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 2483 | defer cancel() |
| 2484 | _, err = js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2485 | Name: "SCHED", |
| 2486 | Subjects: []string{"schedule.>", "target.>"}, |
| 2487 | AllowMsgSchedules: true, |
| 2488 | }) |
| 2489 | if err != nil { |
| 2490 | t.Fatalf("Unexpected error: %v", err) |
| 2491 | } |
| 2492 | |
| 2493 | cons, err := js.CreateConsumer(ctx, "SCHED", jetstream.ConsumerConfig{ |
| 2494 | FilterSubject: "target.at", |
| 2495 | }) |
| 2496 | if err != nil { |
| 2497 | t.Fatalf("Unexpected error: %v", err) |
| 2498 | } |
| 2499 | |
| 2500 | _, err = js.Publish(ctx, "schedule.at", []byte("scheduled payload"), |
| 2501 | jetstream.WithScheduleAt(time.Now().Add(2*time.Second)), |
| 2502 | jetstream.WithScheduleTarget("target.at"), |
| 2503 | ) |
| 2504 | if err != nil { |
| 2505 | t.Fatalf("Unexpected error: %v", err) |
| 2506 | } |
| 2507 | |
| 2508 | msg, err := cons.Next(jetstream.FetchMaxWait(5 * time.Second)) |
| 2509 | if err != nil { |
| 2510 | t.Fatalf("Expected to receive scheduled message: %v", err) |
| 2511 | } |
| 2512 | if string(msg.Data()) != "scheduled payload" { |
| 2513 | t.Fatalf("Expected payload %q; got: %q", "scheduled payload", string(msg.Data())) |
| 2514 | } |
| 2515 | if msg.Headers().Get(jetstream.SchedulerHeader) == "" { |
| 2516 | t.Fatal("Expected Nats-Scheduler header to be set on delivered message") |
| 2517 | } |
| 2518 | } |
| 2519 | |
| 2520 | func TestScheduleEveryDelivery(t *testing.T) { |
| 2521 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected