(t *testing.T)
| 490 | } |
| 491 | |
| 492 | func TestPublishAsyncWithTTL(t *testing.T) { |
| 493 | srv := RunBasicJetStreamServer() |
| 494 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 495 | nc, js := jsClient(t, srv) |
| 496 | defer nc.Close() |
| 497 | |
| 498 | _, err := js.AddStream(&nats.StreamConfig{ |
| 499 | Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64, AllowMsgTTL: true}) |
| 500 | if err != nil { |
| 501 | t.Fatalf("Unexpected error: %v", err) |
| 502 | } |
| 503 | |
| 504 | paf, err := js.PublishAsync("FOO.1", []byte("msg"), nats.MsgTTL(1*time.Second)) |
| 505 | if err != nil { |
| 506 | t.Fatalf("Unexpected error: %v", err) |
| 507 | } |
| 508 | var ack *nats.PubAck |
| 509 | select { |
| 510 | case ack = <-paf.Ok(): |
| 511 | case <-time.After(5 * time.Second): |
| 512 | t.Fatalf("Did not receive ack") |
| 513 | } |
| 514 | |
| 515 | gotMsg, err := js.GetMsg("foo", ack.Sequence) |
| 516 | if err != nil { |
| 517 | t.Fatalf("Unexpected error: %v", err) |
| 518 | } |
| 519 | if ttl := gotMsg.Header.Get("Nats-TTL"); ttl != "1s" { |
| 520 | t.Fatalf("Expected message to have TTL header set to 1s; got: %s", ttl) |
| 521 | } |
| 522 | time.Sleep(1500 * time.Millisecond) |
| 523 | _, err = js.GetMsg("foo", ack.Sequence) |
| 524 | if !errors.Is(err, nats.ErrMsgNotFound) { |
| 525 | t.Fatalf("Expected not found error; got: %v", err) |
| 526 | } |
| 527 | } |
| 528 | |
| 529 | func TestJetStreamSubscribe(t *testing.T) { |
| 530 | s := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected