(t *testing.T)
| 11089 | } |
| 11090 | |
| 11091 | func TestPullConsumerFetchRace(t *testing.T) { |
| 11092 | srv := RunBasicJetStreamServer() |
| 11093 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 11094 | |
| 11095 | nc, js := jsClient(t, srv) |
| 11096 | defer nc.Close() |
| 11097 | |
| 11098 | _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 11099 | if err != nil { |
| 11100 | t.Fatalf("Unexpected error: %v", err) |
| 11101 | } |
| 11102 | |
| 11103 | for i := 0; i < 3; i++ { |
| 11104 | if _, err := js.Publish("FOO.123", []byte(fmt.Sprintf("msg-%d", i))); err != nil { |
| 11105 | t.Fatalf("Unexpected error during publish: %s", err) |
| 11106 | } |
| 11107 | } |
| 11108 | sub, err := js.PullSubscribe("FOO.123", "") |
| 11109 | if err != nil { |
| 11110 | t.Fatalf("Unexpected error: %v", err) |
| 11111 | } |
| 11112 | cons, err := sub.ConsumerInfo() |
| 11113 | if err != nil { |
| 11114 | t.Fatalf("Unexpected error: %v", err) |
| 11115 | } |
| 11116 | msgs, err := sub.FetchBatch(5) |
| 11117 | if err != nil { |
| 11118 | t.Fatalf("Unexpected error: %v", err) |
| 11119 | } |
| 11120 | errCh := make(chan error) |
| 11121 | go func() { |
| 11122 | for { |
| 11123 | err := msgs.Error() |
| 11124 | if err != nil { |
| 11125 | errCh <- err |
| 11126 | return |
| 11127 | } |
| 11128 | } |
| 11129 | }() |
| 11130 | deleteErrCh := make(chan error, 1) |
| 11131 | go func() { |
| 11132 | time.Sleep(100 * time.Millisecond) |
| 11133 | if err := js.DeleteConsumer("foo", cons.Name); err != nil { |
| 11134 | deleteErrCh <- err |
| 11135 | } |
| 11136 | close(deleteErrCh) |
| 11137 | }() |
| 11138 | |
| 11139 | var i int |
| 11140 | for msg := range msgs.Messages() { |
| 11141 | if string(msg.Data) != fmt.Sprintf("msg-%d", i) { |
| 11142 | t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, fmt.Sprintf("msg-%d", i), string(msg.Data)) |
| 11143 | } |
| 11144 | i++ |
| 11145 | } |
| 11146 | if i != 3 { |
| 11147 | t.Fatalf("Invalid number of messages received; want: %d; got: %d", 5, i) |
| 11148 | } |
nothing calls this directly
no test coverage detected