(t *testing.T)
| 2038 | } |
| 2039 | |
| 2040 | func TestPublishWithScheduleAt(t *testing.T) { |
| 2041 | srv := RunBasicJetStreamServer() |
| 2042 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2043 | nc, err := nats.Connect(srv.ClientURL()) |
| 2044 | if err != nil { |
| 2045 | t.Fatalf("Unexpected error: %v", err) |
| 2046 | } |
| 2047 | defer nc.Close() |
| 2048 | |
| 2049 | js, err := jetstream.New(nc) |
| 2050 | if err != nil { |
| 2051 | t.Fatalf("Unexpected error: %v", err) |
| 2052 | } |
| 2053 | |
| 2054 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 2055 | defer cancel() |
| 2056 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 2057 | Name: "SCHED", |
| 2058 | Subjects: []string{"schedule.>", "target.>"}, |
| 2059 | AllowMsgSchedules: true, |
| 2060 | }) |
| 2061 | if err != nil { |
| 2062 | t.Fatalf("Unexpected error: %v", err) |
| 2063 | } |
| 2064 | cons, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ |
| 2065 | FilterSubject: "target.>", |
| 2066 | }) |
| 2067 | if err != nil { |
| 2068 | t.Fatalf("Unexpected error: %v", err) |
| 2069 | } |
| 2070 | |
| 2071 | schedTime := time.Now().Add(time.Second).UTC().Truncate(time.Second) |
| 2072 | ack, err := js.Publish(ctx, "schedule.at", []byte("hello"), |
| 2073 | jetstream.WithScheduleAt(schedTime), |
| 2074 | jetstream.WithScheduleTarget("target.at"), |
| 2075 | ) |
| 2076 | if err != nil { |
| 2077 | t.Fatalf("Unexpected error: %v", err) |
| 2078 | } |
| 2079 | gotMsg, err := stream.GetMsg(ctx, ack.Sequence) |
| 2080 | if err != nil { |
| 2081 | t.Fatalf("Unexpected error: %v", err) |
| 2082 | } |
| 2083 | if got := gotMsg.Header.Get(jetstream.ScheduleHeader); got != "@at "+schedTime.Format(time.RFC3339) { |
| 2084 | t.Fatalf("Expected schedule header %q; got: %q", "@at "+schedTime.Format(time.RFC3339), got) |
| 2085 | } |
| 2086 | if got := gotMsg.Header.Get(jetstream.ScheduleTargetHeader); got != "target.at" { |
| 2087 | t.Fatalf("Expected schedule target header %q; got: %q", "target.at", got) |
| 2088 | } |
| 2089 | |
| 2090 | msg, err := cons.Next(jetstream.FetchMaxWait(5 * time.Second)) |
| 2091 | if err != nil { |
| 2092 | t.Fatalf("Unexpected error: %v", err) |
| 2093 | } |
| 2094 | if string(msg.Data()) != "hello" { |
| 2095 | t.Fatalf("Expected message data %q; got: %q", "hello", string(msg.Data())) |
| 2096 | } |
| 2097 | } |
nothing calls this directly
no test coverage detected