(t *testing.T)
| 2310 | } |
| 2311 | |
| 2312 | func TestPublishWithScheduleTTL(t *testing.T) { |
| 2313 | srv := RunBasicJetStreamServer() |
| 2314 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2315 | nc, err := nats.Connect(srv.ClientURL()) |
| 2316 | if err != nil { |
| 2317 | t.Fatalf("Unexpected error: %v", err) |
| 2318 | } |
| 2319 | defer nc.Close() |
| 2320 | |
| 2321 | js, err := jetstream.New(nc) |
| 2322 | if err != nil { |
| 2323 | t.Fatalf("Unexpected error: %v", err) |
| 2324 | } |
| 2325 | |
| 2326 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 2327 | defer cancel() |
| 2328 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2329 | Name: "SCHED", |
| 2330 | Subjects: []string{"schedule.>", "target.>"}, |
| 2331 | AllowMsgSchedules: true, |
| 2332 | AllowMsgTTL: true, |
| 2333 | }) |
| 2334 | if err != nil { |
| 2335 | t.Fatalf("Unexpected error: %v", err) |
| 2336 | } |
| 2337 | |
| 2338 | t.Run("duration", func(t *testing.T) { |
| 2339 | ack, err := js.Publish(ctx, "schedule.ttl1", nil, |
| 2340 | jetstream.WithScheduleEvery(5*time.Second), |
| 2341 | jetstream.WithScheduleTarget("target.ttl1"), |
| 2342 | jetstream.WithScheduleTTL(10*time.Minute), |
| 2343 | ) |
| 2344 | if err != nil { |
| 2345 | t.Fatalf("Unexpected error: %v", err) |
| 2346 | } |
| 2347 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2348 | if err != nil { |
| 2349 | t.Fatalf("Unexpected error: %v", err) |
| 2350 | } |
| 2351 | if got := gotMsg.Header.Get(jetstream.ScheduleTTLHeader); got != "10m0s" { |
| 2352 | t.Fatalf("Expected schedule TTL header %q; got: %q", "10m0s", got) |
| 2353 | } |
| 2354 | }) |
| 2355 | |
| 2356 | t.Run("never", func(t *testing.T) { |
| 2357 | ack, err := js.Publish(ctx, "schedule.ttl2", nil, |
| 2358 | jetstream.WithScheduleEvery(5*time.Second), |
| 2359 | jetstream.WithScheduleTarget("target.ttl2"), |
| 2360 | jetstream.WithScheduleTTLNever(), |
| 2361 | ) |
| 2362 | if err != nil { |
| 2363 | t.Fatalf("Unexpected error: %v", err) |
| 2364 | } |
| 2365 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2366 | if err != nil { |
| 2367 | t.Fatalf("Unexpected error: %v", err) |
| 2368 | } |
| 2369 | if got := gotMsg.Header.Get(jetstream.ScheduleTTLHeader); got != "never" { |
nothing calls this directly
no test coverage detected