MCPcopy
hub / github.com/nats-io/nats.go / TestPublishWithScheduleEvery

Function TestPublishWithScheduleEvery

jetstream/test/publish_test.go:2099–2152  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2097}
2098
2099func TestPublishWithScheduleEvery(t *testing.T) {
2100 srv := RunBasicJetStreamServer()
2101 defer shutdownJSServerAndRemoveStorage(t, srv)
2102 nc, err := nats.Connect(srv.ClientURL())
2103 if err != nil {
2104 t.Fatalf("Unexpected error: %v", err)
2105 }
2106 defer nc.Close()
2107
2108 js, err := jetstream.New(nc)
2109 if err != nil {
2110 t.Fatalf("Unexpected error: %v", err)
2111 }
2112
2113 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2114 defer cancel()
2115 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
2116 Name: "SCHED",
2117 Subjects: []string{"schedule.>", "target.>"},
2118 AllowMsgSchedules: true,
2119 })
2120 if err != nil {
2121 t.Fatalf("Unexpected error: %v", err)
2122 }
2123
2124 ack, err := js.Publish(ctx, "schedule.every", nil,
2125 jetstream.WithScheduleEvery(5*time.Second),
2126 jetstream.WithScheduleTarget("target.every"),
2127 )
2128 if err != nil {
2129 t.Fatalf("Unexpected error: %v", err)
2130 }
2131
2132 // check if correct headers are set
2133 gotMsg, err := stream.GetMsg(ctx, ack.Sequence)
2134 if err != nil {
2135 t.Fatalf("Unexpected error: %v", err)
2136 }
2137 if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@every 5s" {
2138 t.Fatalf("Expected schedule header %q; got: %q", "@every 5s", got)
2139 }
2140 if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.every" {
2141 t.Fatalf("Expected schedule target header %q; got: %q", "target.every", got)
2142 }
2143
2144 // invalid duration (should be at least 1s)
2145 _, err = js.Publish(ctx, "schedule.every", nil,
2146 jetstream.WithScheduleEvery(500*time.Millisecond),
2147 jetstream.WithScheduleTarget("target.every"),
2148 )
2149 if !errors.Is(err, jetstream.ErrInvalidOption) {
2150 t.Fatalf("Expected ErrInvalidOption; got: %v", err)
2151 }
2152}
2153
2154func TestPublishWithScheduleMissingTarget(t *testing.T) {
2155 srv := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 13

NewFunction · 0.92
WithScheduleEveryFunction · 0.92
WithScheduleTargetFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
GetMsgMethod · 0.65
GetMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected