(t *testing.T)
| 223 | } |
| 224 | |
| 225 | func TestExtendedReconnectFunctionality(t *testing.T) { |
| 226 | ts := startReconnectServer(t) |
| 227 | defer ts.Shutdown() |
| 228 | |
| 229 | opts := reconnectOpts |
| 230 | dch := make(chan bool, 2) |
| 231 | opts.DisconnectedErrCB = func(_ *nats.Conn, _ error) { |
| 232 | dch <- true |
| 233 | } |
| 234 | rch := make(chan bool, 1) |
| 235 | opts.ReconnectedCB = func(_ *nats.Conn) { |
| 236 | rch <- true |
| 237 | } |
| 238 | nc, err := opts.Connect() |
| 239 | if err != nil { |
| 240 | t.Fatalf("Should have connected ok: %v", err) |
| 241 | } |
| 242 | defer nc.Close() |
| 243 | |
| 244 | testString := "bar" |
| 245 | received := int32(0) |
| 246 | |
| 247 | nc.Subscribe("foo", func(*nats.Msg) { |
| 248 | atomic.AddInt32(&received, 1) |
| 249 | }) |
| 250 | |
| 251 | sub, _ := nc.Subscribe("foobar", func(*nats.Msg) { |
| 252 | atomic.AddInt32(&received, 1) |
| 253 | }) |
| 254 | |
| 255 | nc.Publish("foo", []byte(testString)) |
| 256 | nc.Flush() |
| 257 | |
| 258 | ts.Shutdown() |
| 259 | // server is stopped here.. |
| 260 | |
| 261 | // wait for disconnect |
| 262 | if e := WaitTime(dch, 2*time.Second); e != nil { |
| 263 | t.Fatal("Did not receive a disconnect callback message") |
| 264 | } |
| 265 | |
| 266 | // Sub while disconnected |
| 267 | nc.Subscribe("bar", func(*nats.Msg) { |
| 268 | atomic.AddInt32(&received, 1) |
| 269 | }) |
| 270 | |
| 271 | // Unsub foobar while disconnected |
| 272 | sub.Unsubscribe() |
| 273 | |
| 274 | if err = nc.Publish("foo", []byte(testString)); err != nil { |
| 275 | t.Fatalf("Received an error after disconnect: %v\n", err) |
| 276 | } |
| 277 | |
| 278 | if err = nc.Publish("bar", []byte(testString)); err != nil { |
| 279 | t.Fatalf("Received an error after disconnect: %v\n", err) |
| 280 | } |
| 281 | |
| 282 | ts = startReconnectServer(t) |
nothing calls this directly
no test coverage detected