MCPcopy
hub / github.com/nats-io/nats.go / TestPublishTimeout

Function TestPublishTimeout

jetstream/test/publish_test.go:860–893  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

858}
859
860func TestPublishTimeout(t *testing.T) {
861 srv := RunBasicJetStreamServer()
862 defer shutdownJSServerAndRemoveStorage(t, srv)
863 nc, err := nats.Connect(srv.ClientURL())
864 if err != nil {
865 t.Fatalf("Unexpected error: %v", err)
866 }
867
868 js, err := jetstream.New(nc, jetstream.WithDefaultTimeout(200*time.Millisecond))
869 if err != nil {
870 t.Fatalf("Unexpected error: %v", err)
871 }
872 defer nc.Close()
873
874 // create stream with no ack to force timeout
875 _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{
876 Name: "foo",
877 Subjects: []string{"FOO.*"},
878 NoAck: true,
879 })
880 if err != nil {
881 t.Fatalf("Unexpected error: %v", err)
882 }
883
884 now := time.Now()
885 _, err = js.Publish(context.Background(), "FOO.1", []byte("msg"))
886 if !errors.Is(err, context.DeadlineExceeded) {
887 t.Fatalf("Expected deadline exceeded error; got: %v", err)
888 }
889 since := time.Since(now)
890 if since < 200*time.Millisecond || since > 500*time.Millisecond {
891 t.Fatalf("Expected timeout to be around 200ms; got: %v", since)
892 }
893}
894
895func TestPublishMsgAsync(t *testing.T) {
896 type publishConfig struct {

Callers

nothing calls this directly

Calls 10

NewFunction · 0.92
WithDefaultTimeoutFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected