MCPcopy
hub / github.com/nats-io/nats.go / TestPullConsumerFetch_WithCluster

Function TestPullConsumerFetch_WithCluster

jetstream/test/pull_test.go:1004–1103  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1002}
1003
1004func TestPullConsumerFetch_WithCluster(t *testing.T) {
1005 testSubject := "FOO.123"
1006 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
1007 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
1008 for _, msg := range testMsgs {
1009 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
1010 t.Fatalf("Unexpected error during publish: %s", err)
1011 }
1012 }
1013 }
1014
1015 name := "cluster"
1016 stream := jetstream.StreamConfig{
1017 Name: name,
1018 Replicas: 1,
1019 Subjects: []string{"FOO.*"},
1020 }
1021 t.Run("no options", func(t *testing.T) {
1022 withJSClusterAndStream(t, name, 3, stream, func(t *testing.T, subject string, srvs ...*jsServer) {
1023 srv := srvs[0]
1024 nc, err := nats.Connect(srv.ClientURL())
1025 if err != nil {
1026 t.Fatalf("Unexpected error: %v", err)
1027 }
1028
1029 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1030 defer cancel()
1031 js, err := jetstream.New(nc)
1032 if err != nil {
1033 t.Fatalf("Unexpected error: %v", err)
1034 }
1035 defer nc.Close()
1036
1037 s, err := js.Stream(ctx, stream.Name)
1038 if err != nil {
1039 t.Fatalf("Unexpected error: %v", err)
1040 }
1041
1042 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
1043 if err != nil {
1044 t.Fatalf("Unexpected error: %v", err)
1045 }
1046
1047 publishTestMsgs(t, js)
1048 msgs, err := c.Fetch(5)
1049 if err != nil {
1050 t.Fatalf("Unexpected error: %v", err)
1051 }
1052
1053 var i int
1054 for msg := range msgs.Messages() {
1055 if string(msg.Data()) != testMsgs[i] {
1056 t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data()))
1057 }
1058 i++
1059 }
1060 if msgs.Error() != nil {
1061 t.Fatalf("Unexpected error during fetch: %v", msgs.Error())

Callers

nothing calls this directly

Calls 13

NewFunction · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
withJSClusterAndStreamFunction · 0.70
PublishMethod · 0.65
StreamMethod · 0.65
FetchMethod · 0.65
MessagesMethod · 0.65
DataMethod · 0.65
ErrorMethod · 0.65
FetchNoWaitMethod · 0.65

Tested by

no test coverage detected