(t *testing.T)
| 1425 | } |
| 1426 | |
| 1427 | func TestErrInReadLoop(t *testing.T) { |
| 1428 | serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n" |
| 1429 | |
| 1430 | l, e := net.Listen("tcp", "127.0.0.1:0") |
| 1431 | if e != nil { |
| 1432 | t.Fatal("Could not listen on an ephemeral port") |
| 1433 | } |
| 1434 | tl := l.(*net.TCPListener) |
| 1435 | defer tl.Close() |
| 1436 | |
| 1437 | addr := tl.Addr().(*net.TCPAddr) |
| 1438 | done := make(chan bool) |
| 1439 | cch := make(chan bool) |
| 1440 | |
| 1441 | errCh := make(chan error, 1) |
| 1442 | go func() { |
| 1443 | conn, err := l.Accept() |
| 1444 | if err != nil { |
| 1445 | errCh <- fmt.Errorf("error accepting client connection: %v", err) |
| 1446 | return |
| 1447 | } |
| 1448 | defer conn.Close() |
| 1449 | info := fmt.Sprintf(serverInfo, addr.IP, addr.Port) |
| 1450 | conn.Write([]byte(info)) |
| 1451 | |
| 1452 | // Read connect and ping commands sent from the client |
| 1453 | br := bufio.NewReaderSize(conn, 1024) |
| 1454 | if _, err := br.ReadString('\n'); err != nil { |
| 1455 | errCh <- fmt.Errorf("expected CONNECT from client, got: %s", err) |
| 1456 | return |
| 1457 | } |
| 1458 | if _, err := br.ReadString('\n'); err != nil { |
| 1459 | errCh <- fmt.Errorf("expected PING from client, got: %s", err) |
| 1460 | return |
| 1461 | } |
| 1462 | conn.Write([]byte("PONG\r\n")) |
| 1463 | |
| 1464 | // Read (and ignore) the SUB from the client |
| 1465 | if _, err := br.ReadString('\n'); err != nil { |
| 1466 | errCh <- fmt.Errorf("expected SUB from client, got: %s", err) |
| 1467 | return |
| 1468 | } |
| 1469 | |
| 1470 | // Send something that should make the subscriber fail. |
| 1471 | conn.Write([]byte("Ivan")) |
| 1472 | |
| 1473 | // Hang around until asked to quit |
| 1474 | <-done |
| 1475 | }() |
| 1476 | |
| 1477 | // Wait for server mock to start |
| 1478 | time.Sleep(100 * time.Millisecond) |
| 1479 | |
| 1480 | natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port) |
| 1481 | opts := nats.GetDefaultOptions() |
| 1482 | opts.AllowReconnect = false |
| 1483 | opts.ClosedCB = func(_ *nats.Conn) { cch <- true } |
| 1484 | opts.Servers = []string{natsURL} |
nothing calls this directly
no test coverage detected