processPushNotifications processes all pending push notifications on a connection This ensures that cluster topology changes are handled immediately before the connection is used This method should be called by the client before using WithReader for command execution Performance optimization: Skip
(ctx context.Context, cn *pool.Conn)
| 1714 | // was performed recently (within 5 seconds). The health check already verified the connection |
| 1715 | // is healthy and checked for unexpected data (push notifications). |
| 1716 | func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { |
| 1717 | // Only process push notifications for RESP3 connections with a processor |
| 1718 | if c.opt.Protocol != 3 || c.pushProcessor == nil { |
| 1719 | return nil |
| 1720 | } |
| 1721 | |
| 1722 | // Performance optimization: Skip MaybeHasData() syscall if health check was recent |
| 1723 | // If the connection was health-checked within the last 5 seconds, we can skip the |
| 1724 | // expensive syscall since the health check already verified no unexpected data. |
| 1725 | // This is safe because: |
| 1726 | // 0. lastHealthCheckNs is set in pool/conn.go:putConn() after a successful health check |
| 1727 | // 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK) |
| 1728 | // 2. If push notifications arrived, they would have been detected by health check |
| 1729 | // 3. 5 seconds is short enough that connection state is still fresh |
| 1730 | // 4. Push notifications will be processed by the next WithReader call |
| 1731 | // used it is set on getConn, so we should use another timer (lastPutAt?) |
| 1732 | lastHealthCheckNs := cn.LastPutAtNs() |
| 1733 | if lastHealthCheckNs > 0 { |
| 1734 | // Use pool's cached time to avoid expensive time.Now() syscall |
| 1735 | nowNs := pool.GetCachedTimeNs() |
| 1736 | if nowNs-lastHealthCheckNs < int64(5*time.Second) { |
| 1737 | // Recent health check confirmed no unexpected data, skip the syscall |
| 1738 | return nil |
| 1739 | } |
| 1740 | } |
| 1741 | |
| 1742 | // Check if there is any data to read before processing |
| 1743 | // This is an optimization on UNIX systems where MaybeHasData is a syscall |
| 1744 | // On Windows, MaybeHasData always returns true, so this check is a no-op |
| 1745 | if !cn.MaybeHasData() { |
| 1746 | return nil |
| 1747 | } |
| 1748 | |
| 1749 | // Use WithReader to access the reader and process push notifications |
| 1750 | // This is critical for maintnotifications to work properly |
| 1751 | // NOTE: almost no timeouts are set for this read, so it should not block |
| 1752 | // longer than necessary, 10us should be plenty of time to read if there are any push notifications |
| 1753 | // on the socket. |
| 1754 | return cn.WithReader(ctx, 10*time.Microsecond, func(rd *proto.Reader) error { |
| 1755 | // Create handler context with client, connection pool, and connection information |
| 1756 | handlerCtx := c.pushNotificationHandlerContext(cn) |
| 1757 | return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) |
| 1758 | }) |
| 1759 | } |
| 1760 | |
| 1761 | // processPendingPushNotificationWithReader processes all pending push notifications on a connection |
| 1762 | // This method should be called by the client in WithReader before reading the reply |
no test coverage detected