(t *testing.T)
| 1002 | } |
| 1003 | |
| 1004 | func TestPullConsumerFetch_WithCluster(t *testing.T) { |
| 1005 | testSubject := "FOO.123" |
| 1006 | testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} |
| 1007 | publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { |
| 1008 | for _, msg := range testMsgs { |
| 1009 | if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { |
| 1010 | t.Fatalf("Unexpected error during publish: %s", err) |
| 1011 | } |
| 1012 | } |
| 1013 | } |
| 1014 | |
| 1015 | name := "cluster" |
| 1016 | stream := jetstream.StreamConfig{ |
| 1017 | Name: name, |
| 1018 | Replicas: 1, |
| 1019 | Subjects: []string{"FOO.*"}, |
| 1020 | } |
| 1021 | t.Run("no options", func(t *testing.T) { |
| 1022 | withJSClusterAndStream(t, name, 3, stream, func(t *testing.T, subject string, srvs ...*jsServer) { |
| 1023 | srv := srvs[0] |
| 1024 | nc, err := nats.Connect(srv.ClientURL()) |
| 1025 | if err != nil { |
| 1026 | t.Fatalf("Unexpected error: %v", err) |
| 1027 | } |
| 1028 | |
| 1029 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1030 | defer cancel() |
| 1031 | js, err := jetstream.New(nc) |
| 1032 | if err != nil { |
| 1033 | t.Fatalf("Unexpected error: %v", err) |
| 1034 | } |
| 1035 | defer nc.Close() |
| 1036 | |
| 1037 | s, err := js.Stream(ctx, stream.Name) |
| 1038 | if err != nil { |
| 1039 | t.Fatalf("Unexpected error: %v", err) |
| 1040 | } |
| 1041 | |
| 1042 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) |
| 1043 | if err != nil { |
| 1044 | t.Fatalf("Unexpected error: %v", err) |
| 1045 | } |
| 1046 | |
| 1047 | publishTestMsgs(t, js) |
| 1048 | msgs, err := c.Fetch(5) |
| 1049 | if err != nil { |
| 1050 | t.Fatalf("Unexpected error: %v", err) |
| 1051 | } |
| 1052 | |
| 1053 | var i int |
| 1054 | for msg := range msgs.Messages() { |
| 1055 | if string(msg.Data()) != testMsgs[i] { |
| 1056 | t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data())) |
| 1057 | } |
| 1058 | i++ |
| 1059 | } |
| 1060 | if msgs.Error() != nil { |
| 1061 | t.Fatalf("Unexpected error during fetch: %v", msgs.Error()) |
nothing calls this directly
no test coverage detected