(t *testing.T)
| 1033 | } |
| 1034 | |
| 1035 | func TestGetLastMsgForSubject(t *testing.T) { |
| 1036 | tests := []struct { |
| 1037 | name string |
| 1038 | subject string |
| 1039 | expectedData string |
| 1040 | allowDirect bool |
| 1041 | timeout time.Duration |
| 1042 | withError error |
| 1043 | }{ |
| 1044 | { |
| 1045 | name: "get existing msg", |
| 1046 | subject: "*.A", |
| 1047 | expectedData: "msg 4 on subject A", |
| 1048 | timeout: 5 * time.Second, |
| 1049 | }, |
| 1050 | { |
| 1051 | name: "with empty context", |
| 1052 | subject: "*.A", |
| 1053 | expectedData: "msg 4 on subject A", |
| 1054 | }, |
| 1055 | { |
| 1056 | name: "get last msg from stream", |
| 1057 | subject: ">", |
| 1058 | expectedData: "msg 4 on subject B", |
| 1059 | timeout: 5 * time.Second, |
| 1060 | }, |
| 1061 | { |
| 1062 | name: "no messages on subject", |
| 1063 | subject: "*.Z", |
| 1064 | withError: jetstream.ErrMsgNotFound, |
| 1065 | }, |
| 1066 | { |
| 1067 | name: "context timeout", |
| 1068 | subject: "*.A", |
| 1069 | timeout: 1 * time.Microsecond, |
| 1070 | withError: context.DeadlineExceeded, |
| 1071 | }, |
| 1072 | } |
| 1073 | |
| 1074 | srv := RunBasicJetStreamServer() |
| 1075 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1076 | nc, err := nats.Connect(srv.ClientURL()) |
| 1077 | if err != nil { |
| 1078 | t.Fatalf("Unexpected error: %v", err) |
| 1079 | } |
| 1080 | |
| 1081 | js, err := jetstream.New(nc) |
| 1082 | if err != nil { |
| 1083 | t.Fatalf("Unexpected error: %v", err) |
| 1084 | } |
| 1085 | defer nc.Close() |
| 1086 | |
| 1087 | s1, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, Description: "desc"}) |
| 1088 | if err != nil { |
| 1089 | t.Fatalf("Unexpected error: %v", err) |
| 1090 | } |
| 1091 | for i := 1; i < 5; i++ { |
| 1092 | if _, err := js.Publish(context.Background(), "FOO.A", []byte(fmt.Sprintf("msg %d on subject A", i))); err != nil { |
nothing calls this directly
no test coverage detected