(t *testing.T)
| 2249 | } |
| 2250 | |
| 2251 | func TestPublishWithScheduleSource(t *testing.T) { |
| 2252 | srv := RunBasicJetStreamServer() |
| 2253 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2254 | nc, err := nats.Connect(srv.ClientURL()) |
| 2255 | if err != nil { |
| 2256 | t.Fatalf("Unexpected error: %v", err) |
| 2257 | } |
| 2258 | defer nc.Close() |
| 2259 | |
| 2260 | js, err := jetstream.New(nc) |
| 2261 | if err != nil { |
| 2262 | t.Fatalf("Unexpected error: %v", err) |
| 2263 | } |
| 2264 | |
| 2265 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 2266 | defer cancel() |
| 2267 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2268 | Name: "SCHED", |
| 2269 | Subjects: []string{"schedule.>", "target.>", "source.>"}, |
| 2270 | AllowMsgSchedules: true, |
| 2271 | }) |
| 2272 | if err != nil { |
| 2273 | t.Fatalf("Unexpected error: %v", err) |
| 2274 | } |
| 2275 | cons, err := js.CreateConsumer(ctx, "SCHED", jetstream.ConsumerConfig{ |
| 2276 | FilterSubject: "target.>", |
| 2277 | }) |
| 2278 | if err != nil { |
| 2279 | t.Fatalf("Unexpected error: %v", err) |
| 2280 | } |
| 2281 | |
| 2282 | ack, err := js.Publish(ctx, "schedule.sample", nil, |
| 2283 | jetstream.WithScheduleEvery(time.Second), |
| 2284 | jetstream.WithScheduleTarget("target.sample"), |
| 2285 | jetstream.WithScheduleSource("source.data"), |
| 2286 | ) |
| 2287 | if err != nil { |
| 2288 | t.Fatalf("Unexpected error: %v", err) |
| 2289 | } |
| 2290 | |
| 2291 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2292 | if err != nil { |
| 2293 | t.Fatalf("Unexpected error: %v", err) |
| 2294 | } |
| 2295 | if got := gotMsg.Header.Get(jetstream.ScheduleSourceHeader); got != "source.data" { |
| 2296 | t.Fatalf("Expected schedule source header %q; got: %q", "source.data", got) |
| 2297 | } |
| 2298 | |
| 2299 | if _, err := js.Publish(ctx, "source.data", []byte("hello")); err != nil { |
| 2300 | t.Fatalf("Unexpected error: %v", err) |
| 2301 | } |
| 2302 | msg, err := cons.Next(jetstream.FetchMaxWait(5 * time.Second)) |
| 2303 | if err != nil { |
| 2304 | t.Fatalf("Unexpected error: %v", err) |
| 2305 | } |
| 2306 | if string(msg.Data()) != "hello" { |
| 2307 | t.Fatalf("Expected message data %q; got: %q", "hello", string(msg.Data())) |
| 2308 | } |
nothing calls this directly
no test coverage detected