MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncWithTTL

Function TestPublishAsyncWithTTL

test/js_test.go:492–527  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

490}
491
492func TestPublishAsyncWithTTL(t *testing.T) {
493 srv := RunBasicJetStreamServer()
494 defer shutdownJSServerAndRemoveStorage(t, srv)
495 nc, js := jsClient(t, srv)
496 defer nc.Close()
497
498 _, err := js.AddStream(&nats.StreamConfig{
499 Name: "foo", Subjects: []string{"FOO.*"}, MaxMsgSize: 64, AllowMsgTTL: true})
500 if err != nil {
501 t.Fatalf("Unexpected error: %v", err)
502 }
503
504 paf, err := js.PublishAsync("FOO.1", []byte("msg"), nats.MsgTTL(1*time.Second))
505 if err != nil {
506 t.Fatalf("Unexpected error: %v", err)
507 }
508 var ack *nats.PubAck
509 select {
510 case ack = <-paf.Ok():
511 case <-time.After(5 * time.Second):
512 t.Fatalf("Did not receive ack")
513 }
514
515 gotMsg, err := js.GetMsg("foo", ack.Sequence)
516 if err != nil {
517 t.Fatalf("Unexpected error: %v", err)
518 }
519 if ttl := gotMsg.Header.Get("Nats-TTL"); ttl != "1s" {
520 t.Fatalf("Expected message to have TTL header set to 1s; got: %s", ttl)
521 }
522 time.Sleep(1500 * time.Millisecond)
523 _, err = js.GetMsg("foo", ack.Sequence)
524 if !errors.Is(err, nats.ErrMsgNotFound) {
525 t.Fatalf("Expected not found error; got: %v", err)
526 }
527}
528
529func TestJetStreamSubscribe(t *testing.T) {
530 s := RunBasicJetStreamServer()

Callers

nothing calls this directly

Calls 11

FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishAsyncMethod · 0.65
OkMethod · 0.65
GetMsgMethod · 0.65
GetMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected