(t *testing.T)
| 325 | } |
| 326 | |
| 327 | func TestJetStreamSubscribeReconnect(t *testing.T) { |
| 328 | s := RunBasicJetStreamServer() |
| 329 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 330 | |
| 331 | rch := make(chan struct{}, 1) |
| 332 | nc, err := nats.Connect(s.ClientURL(), |
| 333 | nats.ReconnectWait(50*time.Millisecond), |
| 334 | nats.ReconnectHandler(func(_ *nats.Conn) { |
| 335 | select { |
| 336 | case rch <- struct{}{}: |
| 337 | default: |
| 338 | } |
| 339 | })) |
| 340 | if err != nil { |
| 341 | t.Fatalf("Unexpected error: %v", err) |
| 342 | } |
| 343 | defer nc.Close() |
| 344 | |
| 345 | js, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond)) |
| 346 | if err != nil { |
| 347 | t.Fatalf("Unexpected error: %v", err) |
| 348 | } |
| 349 | |
| 350 | // Create the stream using our client API. |
| 351 | _, err = js.AddStream(&nats.StreamConfig{ |
| 352 | Name: "TEST", |
| 353 | Subjects: []string{"foo"}, |
| 354 | }) |
| 355 | if err != nil { |
| 356 | t.Fatalf("Unexpected error: %v", err) |
| 357 | } |
| 358 | |
| 359 | sub, err := js.SubscribeSync("foo", nats.Durable("bar")) |
| 360 | if err != nil { |
| 361 | t.Fatalf("Error on subscribe: %v", err) |
| 362 | } |
| 363 | |
| 364 | sendAndReceive := func(msgContent string) { |
| 365 | t.Helper() |
| 366 | var ok bool |
| 367 | var err error |
| 368 | for i := 0; i < 5; i++ { |
| 369 | if _, err = js.Publish("foo", []byte(msgContent)); err != nil { |
| 370 | time.Sleep(250 * time.Millisecond) |
| 371 | continue |
| 372 | } |
| 373 | ok = true |
| 374 | break |
| 375 | } |
| 376 | if !ok { |
| 377 | t.Fatalf("Error on publish: %v", err) |
| 378 | } |
| 379 | msg, err := sub.NextMsg(time.Second) |
| 380 | if err != nil { |
| 381 | t.Fatal("Did not get message") |
| 382 | } |
| 383 | if string(msg.Data) != msgContent { |
| 384 | t.Fatalf("Unexpected content: %q", msg.Data) |
nothing calls this directly
no test coverage detected