MCPcopy
hub / github.com/nats-io/nats.go / TestPublishWithScheduleAt

Function TestPublishWithScheduleAt

jetstream/test/publish_test.go:2040–2097  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2038}
2039
2040func TestPublishWithScheduleAt(t *testing.T) {
2041 srv := RunBasicJetStreamServer()
2042 defer shutdownJSServerAndRemoveStorage(t, srv)
2043 nc, err := nats.Connect(srv.ClientURL())
2044 if err != nil {
2045 t.Fatalf("Unexpected error: %v", err)
2046 }
2047 defer nc.Close()
2048
2049 js, err := jetstream.New(nc)
2050 if err != nil {
2051 t.Fatalf("Unexpected error: %v", err)
2052 }
2053
2054 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2055 defer cancel()
2056 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
2057 Name: "SCHED",
2058 Subjects: []string{"schedule.>", "target.>"},
2059 AllowMsgSchedules: true,
2060 })
2061 if err != nil {
2062 t.Fatalf("Unexpected error: %v", err)
2063 }
2064 cons, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
2065 FilterSubject: "target.>",
2066 })
2067 if err != nil {
2068 t.Fatalf("Unexpected error: %v", err)
2069 }
2070
2071 schedTime := time.Now().Add(time.Second).UTC().Truncate(time.Second)
2072 ack, err := js.Publish(ctx, "schedule.at", []byte("hello"),
2073 jetstream.WithScheduleAt(schedTime),
2074 jetstream.WithScheduleTarget("target.at"),
2075 )
2076 if err != nil {
2077 t.Fatalf("Unexpected error: %v", err)
2078 }
2079 gotMsg, err := stream.GetMsg(ctx, ack.Sequence)
2080 if err != nil {
2081 t.Fatalf("Unexpected error: %v", err)
2082 }
2083 if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@at "+schedTime.Format(time.RFC3339) {
2084 t.Fatalf("Expected schedule header %q; got: %q", "@at "+schedTime.Format(time.RFC3339), got)
2085 }
2086 if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.at" {
2087 t.Fatalf("Expected schedule target header %q; got: %q", "target.at", got)
2088 }
2089
2090 msg, err := cons.Next(jetstream.FetchMaxWait(5 * time.Second))
2091 if err != nil {
2092 t.Fatalf("Unexpected error: %v", err)
2093 }
2094 if string(msg.Data()) != "hello" {
2095 t.Fatalf("Expected message data %q; got: %q", "hello", string(msg.Data()))
2096 }
2097}

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
WithScheduleAtFunction · 0.92
WithScheduleTargetFunction · 0.92
FetchMaxWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
CreateConsumerMethod · 0.65
AddMethod · 0.65
PublishMethod · 0.65

Tested by

no test coverage detected