startHealthCheck starts the health checking stream (RPC) to watch the health stats of this connection if health checking is requested and configured. LB channel health checking is enabled when all requirements below are met: 1. it is not disabled by the user with the WithDisableHealthCheck DialOpti
(ctx context.Context)
| 1608 | // |
| 1609 | // Caller must hold ac.mu. |
| 1610 | func (ac *addrConn) startHealthCheck(ctx context.Context) { |
| 1611 | var healthcheckManagingState bool |
| 1612 | defer func() { |
| 1613 | if !healthcheckManagingState { |
| 1614 | ac.updateConnectivityState(connectivity.Ready, nil) |
| 1615 | } |
| 1616 | }() |
| 1617 | |
| 1618 | if ac.cc.dopts.disableHealthCheck { |
| 1619 | return |
| 1620 | } |
| 1621 | healthCheckConfig := ac.cc.healthCheckConfig() |
| 1622 | if healthCheckConfig == nil { |
| 1623 | return |
| 1624 | } |
| 1625 | if !ac.scopts.HealthCheckEnabled { |
| 1626 | return |
| 1627 | } |
| 1628 | healthCheckFunc := internal.HealthCheckFunc |
| 1629 | if healthCheckFunc == nil { |
| 1630 | // The health package is not imported to set health check function. |
| 1631 | // |
| 1632 | // TODO: add a link to the health check doc in the error message. |
| 1633 | channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.") |
| 1634 | return |
| 1635 | } |
| 1636 | |
| 1637 | healthcheckManagingState = true |
| 1638 | |
| 1639 | // Set up the health check helper functions. |
| 1640 | currentTr := ac.transport |
| 1641 | newStream := func(method string) (any, error) { |
| 1642 | ac.mu.Lock() |
| 1643 | if ac.transport != currentTr { |
| 1644 | ac.mu.Unlock() |
| 1645 | return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") |
| 1646 | } |
| 1647 | ac.mu.Unlock() |
| 1648 | return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) |
| 1649 | } |
| 1650 | setConnectivityState := func(s connectivity.State, lastErr error) { |
| 1651 | ac.mu.Lock() |
| 1652 | defer ac.mu.Unlock() |
| 1653 | if ac.transport != currentTr { |
| 1654 | return |
| 1655 | } |
| 1656 | ac.updateConnectivityState(s, lastErr) |
| 1657 | } |
| 1658 | // Start the health checking stream. |
| 1659 | go func() { |
| 1660 | err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) |
| 1661 | if err != nil { |
| 1662 | if status.Code(err) == codes.Unimplemented { |
| 1663 | channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled") |
| 1664 | } else { |
| 1665 | channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err) |
| 1666 | } |
| 1667 | } |
no test coverage detected