MCPcopy
hub / github.com/redis/go-redis / txPipelineReadQueued

Method txPipelineReadQueued

osscluster.go:2026–2086  ·  view source on GitHub ↗
(
	ctx context.Context,
	node *clusterNode,
	cn *pool.Conn,
	rd *proto.Reader,
	statusCmd *StatusCmd,
	cmds []Cmder,
	failedCmds *cmdsMap,
)

Source from the content-addressed store, hash-verified

2024}
2025
2026func (c *ClusterClient) txPipelineReadQueued(
2027 ctx context.Context,
2028 node *clusterNode,
2029 cn *pool.Conn,
2030 rd *proto.Reader,
2031 statusCmd *StatusCmd,
2032 cmds []Cmder,
2033 failedCmds *cmdsMap,
2034) error {
2035 // Parse queued replies.
2036 // To be sure there are no buffered push notifications, we process them before reading the reply
2037 if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
2038 // Log the error but don't fail the command execution
2039 // Push notification processing errors shouldn't break normal Redis operations
2040 internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
2041 }
2042 if err := statusCmd.readReply(rd); err != nil {
2043 return err
2044 }
2045
2046 for _, cmd := range cmds {
2047 // To be sure there are no buffered push notifications, we process them before reading the reply
2048 if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
2049 // Log the error but don't fail the command execution
2050 // Push notification processing errors shouldn't break normal Redis operations
2051 internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
2052 }
2053 err := statusCmd.readReply(rd)
2054 if err != nil {
2055 if c.checkMovedErr(ctx, cmd, err, failedCmds) {
2056 // will be processed later
2057 continue
2058 }
2059 cmd.SetErr(err)
2060 if !isRedisError(err) {
2061 return err
2062 }
2063 }
2064 }
2065
2066 // To be sure there are no buffered push notifications, we process them before reading the reply
2067 if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
2068 // Log the error but don't fail the command execution
2069 // Push notification processing errors shouldn't break normal Redis operations
2070 internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
2071 }
2072 // Parse number of replies.
2073 line, err := rd.ReadLine()
2074 if err != nil {
2075 if err == Nil {
2076 err = TxFailedErr
2077 }
2078 return err
2079 }
2080
2081 if line[0] != proto.RespArray {
2082 return fmt.Errorf("redis: expected '*', but got line %q", line)
2083 }

Callers 1

Calls 7

checkMovedErrMethod · 0.95
isRedisErrorFunction · 0.85
ReadLineMethod · 0.80
PrintfMethod · 0.65
readReplyMethod · 0.65
SetErrMethod · 0.65

Tested by

no test coverage detected