( ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, )
| 1729 | } |
| 1730 | |
| 1731 | func (c *ClusterClient) processPipelineNodeConn( |
| 1732 | ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, |
| 1733 | ) error { |
| 1734 | if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { |
| 1735 | return writeCmds(wr, cmds) |
| 1736 | }); err != nil { |
| 1737 | if isBadConn(err, false, node.Client.getAddr()) { |
| 1738 | node.MarkAsFailing() |
| 1739 | } |
| 1740 | if shouldRetry(err, true) && !cmdsContainNoRetry(cmds) { |
| 1741 | _ = c.mapCmdsByNode(ctx, failedCmds, cmds) |
| 1742 | } |
| 1743 | setCmdsErr(cmds, err) |
| 1744 | return err |
| 1745 | } |
| 1746 | |
| 1747 | return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { |
| 1748 | return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds) |
| 1749 | }) |
| 1750 | } |
| 1751 | |
| 1752 | func (c *ClusterClient) pipelineReadCmds( |
| 1753 | ctx context.Context, |
no test coverage detected