MCPcopy
hub / github.com/nats-io/nats.go / TestConnStatusChangedEvents

Function TestConnStatusChangedEvents

test/conn_test.go:3172–3262  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

3170}
3171
3172func TestConnStatusChangedEvents(t *testing.T) {
3173 t.Run("default events", func(t *testing.T) {
3174 s := RunDefaultServer()
3175 nc, err := nats.Connect(s.ClientURL())
3176 if err != nil {
3177 t.Fatalf("Unexpected error: %s", err)
3178 }
3179 statusCh := nc.StatusChanged()
3180 defer close(statusCh)
3181 newStatus := make(chan nats.Status, 10)
3182 // non-blocking channel, so we need to be constantly listening
3183 go func() {
3184 for {
3185 s, ok := <-statusCh
3186 if !ok {
3187 return
3188 }
3189 newStatus <- s
3190 }
3191 }()
3192 time.Sleep(50 * time.Millisecond)
3193
3194 s.Shutdown()
3195 WaitOnChannel(t, newStatus, nats.RECONNECTING)
3196
3197 s = RunDefaultServer()
3198 defer s.Shutdown()
3199
3200 WaitOnChannel(t, newStatus, nats.CONNECTED)
3201
3202 nc.Close()
3203 WaitOnChannel(t, newStatus, nats.CLOSED)
3204
3205 select {
3206 case s := <-newStatus:
3207 t.Fatalf("Unexpected status received: %s", s)
3208 case <-time.After(100 * time.Millisecond):
3209 }
3210 })
3211
3212 t.Run("custom event only", func(t *testing.T) {
3213 s := RunDefaultServer()
3214 nc, err := nats.Connect(s.ClientURL())
3215 if err != nil {
3216 t.Fatalf("Unexpected error: %s", err)
3217 }
3218 statusCh := nc.StatusChanged(nats.CLOSED)
3219 defer close(statusCh)
3220 newStatus := make(chan nats.Status, 10)
3221 // non-blocking channel, so we need to be constantly listening
3222 go func() {
3223 for {
3224 s, ok := <-statusCh
3225 if !ok {
3226 return
3227 }
3228 newStatus <- s
3229 }

Callers

nothing calls this directly

Calls 7

WaitOnChannelFunction · 0.85
ConnectMethod · 0.80
FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
PublishMethod · 0.65
StatusChangedMethod · 0.45
CloseMethod · 0.45

Tested by

no test coverage detected