MCPcopy
hub / github.com/nats-io/nats.go / TestPublishWithScheduleSource

Function TestPublishWithScheduleSource

jetstream/test/publish_test.go:2251–2310  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2249}
2250
2251func TestPublishWithScheduleSource(t *testing.T) {
2252 srv := RunBasicJetStreamServer()
2253 defer shutdownJSServerAndRemoveStorage(t, srv)
2254 nc, err := nats.Connect(srv.ClientURL())
2255 if err != nil {
2256 t.Fatalf("Unexpected error: %v", err)
2257 }
2258 defer nc.Close()
2259
2260 js, err := jetstream.New(nc)
2261 if err != nil {
2262 t.Fatalf("Unexpected error: %v", err)
2263 }
2264
2265 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2266 defer cancel()
2267 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
2268 Name: "SCHED",
2269 Subjects: []string{"schedule.>", "target.>", "source.>"},
2270 AllowMsgSchedules: true,
2271 })
2272 if err != nil {
2273 t.Fatalf("Unexpected error: %v", err)
2274 }
2275 cons, err := js.CreateConsumer(ctx, "SCHED", jetstream.ConsumerConfig{
2276 FilterSubject: "target.>",
2277 })
2278 if err != nil {
2279 t.Fatalf("Unexpected error: %v", err)
2280 }
2281
2282 ack, err := js.Publish(ctx, "schedule.sample", nil,
2283 jetstream.WithScheduleEvery(time.Second),
2284 jetstream.WithScheduleTarget("target.sample"),
2285 jetstream.WithScheduleSource("source.data"),
2286 )
2287 if err != nil {
2288 t.Fatalf("Unexpected error: %v", err)
2289 }
2290
2291 gotMsg, err := stream.GetMsg(ctx, ack.Sequence)
2292 if err != nil {
2293 t.Fatalf("Unexpected error: %v", err)
2294 }
2295 if got := gotMsg.Header.Get(jetstream.ScheduleSourceHeader); got != "source.data" {
2296 t.Fatalf("Expected schedule source header %q; got: %q", "source.data", got)
2297 }
2298
2299 if _, err := js.Publish(ctx, "source.data", []byte("hello")); err != nil {
2300 t.Fatalf("Unexpected error: %v", err)
2301 }
2302 msg, err := cons.Next(jetstream.FetchMaxWait(5 * time.Second))
2303 if err != nil {
2304 t.Fatalf("Unexpected error: %v", err)
2305 }
2306 if string(msg.Data()) != "hello" {
2307 t.Fatalf("Expected message data %q; got: %q", "hello", string(msg.Data()))
2308 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
WithScheduleEveryFunction · 0.92
WithScheduleTargetFunction · 0.92
WithScheduleSourceFunction · 0.92
FetchMaxWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
CreateConsumerMethod · 0.65
PublishMethod · 0.65

Tested by

no test coverage detected