MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncWithSchedule

Function TestPublishAsyncWithSchedule

jetstream/test/publish_test.go:2417–2466  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2415}
2416
2417func TestPublishAsyncWithSchedule(t *testing.T) {
2418 srv := RunBasicJetStreamServer()
2419 defer shutdownJSServerAndRemoveStorage(t, srv)
2420 nc, err := nats.Connect(srv.ClientURL())
2421 if err != nil {
2422 t.Fatalf("Unexpected error: %v", err)
2423 }
2424 defer nc.Close()
2425
2426 js, err := jetstream.New(nc)
2427 if err != nil {
2428 t.Fatalf("Unexpected error: %v", err)
2429 }
2430
2431 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2432 defer cancel()
2433 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
2434 Name: "SCHED",
2435 Subjects: []string{"schedule.>", "target.>"},
2436 AllowMsgSchedules: true,
2437 })
2438 if err != nil {
2439 t.Fatalf("Unexpected error: %v", err)
2440 }
2441
2442 paf, err := js.PublishAsync("schedule.async", nil,
2443 jetstream.WithScheduleEvery(5*time.Second),
2444 jetstream.WithScheduleTarget("target.async"),
2445 )
2446 if err != nil {
2447 t.Fatalf("Unexpected error: %v", err)
2448 }
2449 var ack *jetstream.PubAck
2450 select {
2451 case ack = <-paf.Ok():
2452 case <-time.After(5 * time.Second):
2453 t.Fatalf("Did not receive ack")
2454 }
2455
2456 gotMsg, err := stream.GetMsg(ctx, ack.Sequence)
2457 if err != nil {
2458 t.Fatalf("Unexpected error: %v", err)
2459 }
2460 if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@every 5s" {
2461 t.Fatalf("Expected schedule header %q; got: %q", "@every 5s", got)
2462 }
2463 if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.async" {
2464 t.Fatalf("Expected schedule target header %q; got: %q", "target.async", got)
2465 }
2466}
2467
2468func TestScheduleAtDelivery(t *testing.T) {
2469 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 13

NewFunction · 0.92
WithScheduleEveryFunction · 0.92
WithScheduleTargetFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishAsyncMethod · 0.65
OkMethod · 0.65
GetMsgMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected