( ctx context.Context, node *clusterNode, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder, failedCmds *cmdsMap, )
| 2024 | } |
| 2025 | |
| 2026 | func (c *ClusterClient) txPipelineReadQueued( |
| 2027 | ctx context.Context, |
| 2028 | node *clusterNode, |
| 2029 | cn *pool.Conn, |
| 2030 | rd *proto.Reader, |
| 2031 | statusCmd *StatusCmd, |
| 2032 | cmds []Cmder, |
| 2033 | failedCmds *cmdsMap, |
| 2034 | ) error { |
| 2035 | // Parse queued replies. |
| 2036 | // To be sure there are no buffered push notifications, we process them before reading the reply |
| 2037 | if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { |
| 2038 | // Log the error but don't fail the command execution |
| 2039 | // Push notification processing errors shouldn't break normal Redis operations |
| 2040 | internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) |
| 2041 | } |
| 2042 | if err := statusCmd.readReply(rd); err != nil { |
| 2043 | return err |
| 2044 | } |
| 2045 | |
| 2046 | for _, cmd := range cmds { |
| 2047 | // To be sure there are no buffered push notifications, we process them before reading the reply |
| 2048 | if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { |
| 2049 | // Log the error but don't fail the command execution |
| 2050 | // Push notification processing errors shouldn't break normal Redis operations |
| 2051 | internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) |
| 2052 | } |
| 2053 | err := statusCmd.readReply(rd) |
| 2054 | if err != nil { |
| 2055 | if c.checkMovedErr(ctx, cmd, err, failedCmds) { |
| 2056 | // will be processed later |
| 2057 | continue |
| 2058 | } |
| 2059 | cmd.SetErr(err) |
| 2060 | if !isRedisError(err) { |
| 2061 | return err |
| 2062 | } |
| 2063 | } |
| 2064 | } |
| 2065 | |
| 2066 | // To be sure there are no buffered push notifications, we process them before reading the reply |
| 2067 | if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { |
| 2068 | // Log the error but don't fail the command execution |
| 2069 | // Push notification processing errors shouldn't break normal Redis operations |
| 2070 | internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) |
| 2071 | } |
| 2072 | // Parse number of replies. |
| 2073 | line, err := rd.ReadLine() |
| 2074 | if err != nil { |
| 2075 | if err == Nil { |
| 2076 | err = TxFailedErr |
| 2077 | } |
| 2078 | return err |
| 2079 | } |
| 2080 | |
| 2081 | if line[0] != proto.RespArray { |
| 2082 | return fmt.Errorf("redis: expected '*', but got line %q", line) |
| 2083 | } |
no test coverage detected