(t *testing.T)
| 2097 | } |
| 2098 | |
| 2099 | func TestPublishWithScheduleEvery(t *testing.T) { |
| 2100 | srv := RunBasicJetStreamServer() |
| 2101 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2102 | nc, err := nats.Connect(srv.ClientURL()) |
| 2103 | if err != nil { |
| 2104 | t.Fatalf("Unexpected error: %v", err) |
| 2105 | } |
| 2106 | defer nc.Close() |
| 2107 | |
| 2108 | js, err := jetstream.New(nc) |
| 2109 | if err != nil { |
| 2110 | t.Fatalf("Unexpected error: %v", err) |
| 2111 | } |
| 2112 | |
| 2113 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 2114 | defer cancel() |
| 2115 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2116 | Name: "SCHED", |
| 2117 | Subjects: []string{"schedule.>", "target.>"}, |
| 2118 | AllowMsgSchedules: true, |
| 2119 | }) |
| 2120 | if err != nil { |
| 2121 | t.Fatalf("Unexpected error: %v", err) |
| 2122 | } |
| 2123 | |
| 2124 | ack, err := js.Publish(ctx, "schedule.every", nil, |
| 2125 | jetstream.WithScheduleEvery(5*time.Second), |
| 2126 | jetstream.WithScheduleTarget("target.every"), |
| 2127 | ) |
| 2128 | if err != nil { |
| 2129 | t.Fatalf("Unexpected error: %v", err) |
| 2130 | } |
| 2131 | |
| 2132 | // check if correct headers are set |
| 2133 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2134 | if err != nil { |
| 2135 | t.Fatalf("Unexpected error: %v", err) |
| 2136 | } |
| 2137 | if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@every 5s" { |
| 2138 | t.Fatalf("Expected schedule header %q; got: %q", "@every 5s", got) |
| 2139 | } |
| 2140 | if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.every" { |
| 2141 | t.Fatalf("Expected schedule target header %q; got: %q", "target.every", got) |
| 2142 | } |
| 2143 | |
| 2144 | // invalid duration (should be at least 1s) |
| 2145 | _, err = js.Publish(ctx, "schedule.every", nil, |
| 2146 | jetstream.WithScheduleEvery(500*time.Millisecond), |
| 2147 | jetstream.WithScheduleTarget("target.every"), |
| 2148 | ) |
| 2149 | if !errors.Is(err, jetstream.ErrInvalidOption) { |
| 2150 | t.Fatalf("Expected ErrInvalidOption; got: %v", err) |
| 2151 | } |
| 2152 | } |
| 2153 | |
| 2154 | func TestPublishWithScheduleMissingTarget(t *testing.T) { |
| 2155 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected