( ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, )
| 1989 | } |
| 1990 | |
| 1991 | func (c *ClusterClient) processTxPipelineNodeConn( |
| 1992 | ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, |
| 1993 | ) error { |
| 1994 | if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { |
| 1995 | return writeCmds(wr, cmds) |
| 1996 | }); err != nil { |
| 1997 | if shouldRetry(err, true) && !cmdsContainNoRetry(cmds) { |
| 1998 | _ = c.mapCmdsByNode(ctx, failedCmds, cmds) |
| 1999 | } |
| 2000 | setCmdsErr(cmds, err) |
| 2001 | return err |
| 2002 | } |
| 2003 | |
| 2004 | return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { |
| 2005 | statusCmd := cmds[0].(*StatusCmd) |
| 2006 | // Trim multi and exec. |
| 2007 | trimmedCmds := cmds[1 : len(cmds)-1] |
| 2008 | |
| 2009 | if err := c.txPipelineReadQueued( |
| 2010 | ctx, node, cn, rd, statusCmd, trimmedCmds, failedCmds, |
| 2011 | ); err != nil { |
| 2012 | setCmdsErr(cmds, err) |
| 2013 | |
| 2014 | moved, ask, addr := isMovedError(err) |
| 2015 | if moved || ask { |
| 2016 | return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds) |
| 2017 | } |
| 2018 | |
| 2019 | return err |
| 2020 | } |
| 2021 | |
| 2022 | return node.Client.pipelineReadCmds(ctx, cn, rd, trimmedCmds) |
| 2023 | }) |
| 2024 | } |
| 2025 | |
| 2026 | func (c *ClusterClient) txPipelineReadQueued( |
| 2027 | ctx context.Context, |
no test coverage detected