MCPcopy
hub / github.com/nats-io/nats.go / TestPullConsumerFetchRace

Function TestPullConsumerFetchRace

test/js_test.go:11091–11169  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

11089}
11090
11091func TestPullConsumerFetchRace(t *testing.T) {
11092 srv := RunBasicJetStreamServer()
11093 defer shutdownJSServerAndRemoveStorage(t, srv)
11094
11095 nc, js := jsClient(t, srv)
11096 defer nc.Close()
11097
11098 _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
11099 if err != nil {
11100 t.Fatalf("Unexpected error: %v", err)
11101 }
11102
11103 for i := 0; i < 3; i++ {
11104 if _, err := js.Publish("FOO.123", []byte(fmt.Sprintf("msg-%d", i))); err != nil {
11105 t.Fatalf("Unexpected error during publish: %s", err)
11106 }
11107 }
11108 sub, err := js.PullSubscribe("FOO.123", "")
11109 if err != nil {
11110 t.Fatalf("Unexpected error: %v", err)
11111 }
11112 cons, err := sub.ConsumerInfo()
11113 if err != nil {
11114 t.Fatalf("Unexpected error: %v", err)
11115 }
11116 msgs, err := sub.FetchBatch(5)
11117 if err != nil {
11118 t.Fatalf("Unexpected error: %v", err)
11119 }
11120 errCh := make(chan error)
11121 go func() {
11122 for {
11123 err := msgs.Error()
11124 if err != nil {
11125 errCh <- err
11126 return
11127 }
11128 }
11129 }()
11130 deleteErrCh := make(chan error, 1)
11131 go func() {
11132 time.Sleep(100 * time.Millisecond)
11133 if err := js.DeleteConsumer("foo", cons.Name); err != nil {
11134 deleteErrCh <- err
11135 }
11136 close(deleteErrCh)
11137 }()
11138
11139 var i int
11140 for msg := range msgs.Messages() {
11141 if string(msg.Data) != fmt.Sprintf("msg-%d", i) {
11142 t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, fmt.Sprintf("msg-%d", i), string(msg.Data))
11143 }
11144 i++
11145 }
11146 if i != 3 {
11147 t.Fatalf("Invalid number of messages received; want: %d; got: %d", 5, i)
11148 }

Callers

nothing calls this directly

Calls 14

FatalfMethod · 0.80
FetchBatchMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
PullSubscribeMethod · 0.65
ConsumerInfoMethod · 0.65
ErrorMethod · 0.65
DeleteConsumerMethod · 0.65
MessagesMethod · 0.65

Tested by

no test coverage detected