(t *testing.T)
| 2229 | } |
| 2230 | |
| 2231 | func TestOrderedConsumerCloseConn(t *testing.T) { |
| 2232 | srv := RunBasicJetStreamServer() |
| 2233 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2234 | nc, err := nats.Connect(srv.ClientURL()) |
| 2235 | if err != nil { |
| 2236 | t.Fatalf("Unexpected error: %v", err) |
| 2237 | } |
| 2238 | |
| 2239 | js, err := jetstream.New(nc) |
| 2240 | if err != nil { |
| 2241 | t.Fatalf("Unexpected error: %v", err) |
| 2242 | } |
| 2243 | defer nc.Close() |
| 2244 | |
| 2245 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 2246 | defer cancel() |
| 2247 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 2248 | if err != nil { |
| 2249 | t.Fatalf("Unexpected error: %v", err) |
| 2250 | } |
| 2251 | consumer, err := s.OrderedConsumer(context.Background(), jetstream.OrderedConsumerConfig{}) |
| 2252 | if err != nil { |
| 2253 | t.Fatalf("Failed to create ordered consumer: %v", err) |
| 2254 | } |
| 2255 | |
| 2256 | gotConnClosedErr := make(chan struct{}) |
| 2257 | |
| 2258 | time.AfterFunc(500*time.Millisecond, func() { |
| 2259 | nc.Close() |
| 2260 | }) |
| 2261 | |
| 2262 | oc, err := consumer.Consume(func(msg jetstream.Msg) { |
| 2263 | }, jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) { |
| 2264 | if errors.Is(err, jetstream.ErrConnectionClosed) { |
| 2265 | close(gotConnClosedErr) |
| 2266 | } |
| 2267 | })) |
| 2268 | if err != nil { |
| 2269 | t.Fatalf("Failed to consume: %v", err) |
| 2270 | } |
| 2271 | |
| 2272 | select { |
| 2273 | case <-gotConnClosedErr: |
| 2274 | case <-time.After(2 * time.Second): |
| 2275 | t.Fatal("Timeout waiting for connection closed error") |
| 2276 | } |
| 2277 | |
| 2278 | select { |
| 2279 | case <-oc.Closed(): |
| 2280 | case <-time.After(2 * time.Second): |
| 2281 | t.Fatalf("Timeout waiting for close") |
| 2282 | } |
| 2283 | } |
| 2284 | |
| 2285 | func TestOrderedConsumerCustomPrefix(t *testing.T) { |
| 2286 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected