MCPcopy
hub / github.com/redis/go-redis / processPushNotifications

Method processPushNotifications

redis.go:1716–1759  ·  redis.go::baseClient.processPushNotifications

processPushNotifications processes all pending push notifications on a connection This ensures that cluster topology changes are handled immediately before the connection is used This method should be called by the client before using WithReader for command execution Performance optimization: Skip

(ctx context.Context, cn *pool.Conn)

Source from the content-addressed store, hash-verified

1714// was performed recently (within 5 seconds). The health check already verified the connection
1715// is healthy and checked for unexpected data (push notifications).
1716func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
1717 // Only process push notifications for RESP3 connections with a processor
1718 if c.opt.Protocol != 3 || c.pushProcessor == nil {
1719 return nil
1720 }
1721
1722 // Performance optimization: Skip MaybeHasData() syscall if health check was recent
1723 // If the connection was health-checked within the last 5 seconds, we can skip the
1724 // expensive syscall since the health check already verified no unexpected data.
1725 // This is safe because:
1726 // 0. lastHealthCheckNs is set in pool/conn.go:putConn() after a successful health check
1727 // 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK)
1728 // 2. If push notifications arrived, they would have been detected by health check
1729 // 3. 5 seconds is short enough that connection state is still fresh
1730 // 4. Push notifications will be processed by the next WithReader call
1731 // used it is set on getConn, so we should use another timer (lastPutAt?)
1732 lastHealthCheckNs := cn.LastPutAtNs()
1733 if lastHealthCheckNs > 0 {
1734 // Use pool's cached time to avoid expensive time.Now() syscall
1735 nowNs := pool.GetCachedTimeNs()
1736 if nowNs-lastHealthCheckNs < int64(5*time.Second) {
1737 // Recent health check confirmed no unexpected data, skip the syscall
1738 return nil
1739 }
1740 }
1741
1742 // Check if there is any data to read before processing
1743 // This is an optimization on UNIX systems where MaybeHasData is a syscall
1744 // On Windows, MaybeHasData always returns true, so this check is a no-op
1745 if !cn.MaybeHasData() {
1746 return nil
1747 }
1748
1749 // Use WithReader to access the reader and process push notifications
1750 // This is critical for maintnotifications to work properly
1751 // NOTE: almost no timeouts are set for this read, so it should not block
1752 // longer than necessary, 10us should be plenty of time to read if there are any push notifications
1753 // on the socket.
1754 return cn.WithReader(ctx, 10*time.Microsecond, func(rd *proto.Reader) error {
1755 // Create handler context with client, connection pool, and connection information
1756 handlerCtx := c.pushNotificationHandlerContext(cn)
1757 return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
1758 })
1759}
1760
1761// processPendingPushNotificationWithReader processes all pending push notifications on a connection
1762// This method should be called by the client in WithReader before reading the reply

Callers 5

releaseConnMethod · 0.95
_processMethod · 0.95
pipelineProcessCmdsMethod · 0.95
txPipelineProcessCmdsMethod · 0.95

Calls 6

GetCachedTimeNsFunction · 0.92
LastPutAtNsMethod · 0.80
MaybeHasDataMethod · 0.80
WithReaderMethod · 0.80

Tested by

no test coverage detected