(t *testing.T)
| 2415 | } |
| 2416 | |
| 2417 | func TestPublishAsyncWithSchedule(t *testing.T) { |
| 2418 | srv := RunBasicJetStreamServer() |
| 2419 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2420 | nc, err := nats.Connect(srv.ClientURL()) |
| 2421 | if err != nil { |
| 2422 | t.Fatalf("Unexpected error: %v", err) |
| 2423 | } |
| 2424 | defer nc.Close() |
| 2425 | |
| 2426 | js, err := jetstream.New(nc) |
| 2427 | if err != nil { |
| 2428 | t.Fatalf("Unexpected error: %v", err) |
| 2429 | } |
| 2430 | |
| 2431 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 2432 | defer cancel() |
| 2433 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2434 | Name: "SCHED", |
| 2435 | Subjects: []string{"schedule.>", "target.>"}, |
| 2436 | AllowMsgSchedules: true, |
| 2437 | }) |
| 2438 | if err != nil { |
| 2439 | t.Fatalf("Unexpected error: %v", err) |
| 2440 | } |
| 2441 | |
| 2442 | paf, err := js.PublishAsync("schedule.async", nil, |
| 2443 | jetstream.WithScheduleEvery(5*time.Second), |
| 2444 | jetstream.WithScheduleTarget("target.async"), |
| 2445 | ) |
| 2446 | if err != nil { |
| 2447 | t.Fatalf("Unexpected error: %v", err) |
| 2448 | } |
| 2449 | var ack *jetstream.PubAck |
| 2450 | select { |
| 2451 | case ack = <-paf.Ok(): |
| 2452 | case <-time.After(5 * time.Second): |
| 2453 | t.Fatalf("Did not receive ack") |
| 2454 | } |
| 2455 | |
| 2456 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2457 | if err != nil { |
| 2458 | t.Fatalf("Unexpected error: %v", err) |
| 2459 | } |
| 2460 | if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@every 5s" { |
| 2461 | t.Fatalf("Expected schedule header %q; got: %q", "@every 5s", got) |
| 2462 | } |
| 2463 | if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.async" { |
| 2464 | t.Fatalf("Expected schedule target header %q; got: %q", "target.async", got) |
| 2465 | } |
| 2466 | } |
| 2467 | |
| 2468 | func TestScheduleAtDelivery(t *testing.T) { |
| 2469 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected