MCPcopy
hub / github.com/redis/go-redis / processTxPipelineNodeConn

Method processTxPipelineNodeConn

osscluster.go:1991–2024  ·  view source on GitHub ↗
(
	ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
)

Source from the content-addressed store, hash-verified

1989}
1990
1991func (c *ClusterClient) processTxPipelineNodeConn(
1992 ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1993) error {
1994 if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1995 return writeCmds(wr, cmds)
1996 }); err != nil {
1997 if shouldRetry(err, true) && !cmdsContainNoRetry(cmds) {
1998 _ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1999 }
2000 setCmdsErr(cmds, err)
2001 return err
2002 }
2003
2004 return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
2005 statusCmd := cmds[0].(*StatusCmd)
2006 // Trim multi and exec.
2007 trimmedCmds := cmds[1 : len(cmds)-1]
2008
2009 if err := c.txPipelineReadQueued(
2010 ctx, node, cn, rd, statusCmd, trimmedCmds, failedCmds,
2011 ); err != nil {
2012 setCmdsErr(cmds, err)
2013
2014 moved, ask, addr := isMovedError(err)
2015 if moved || ask {
2016 return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
2017 }
2018
2019 return err
2020 }
2021
2022 return node.Client.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
2023 })
2024}
2025
2026func (c *ClusterClient) txPipelineReadQueued(
2027 ctx context.Context,

Callers 1

processTxPipelineNodeMethod · 0.95

Calls 12

contextMethod · 0.95
mapCmdsByNodeMethod · 0.95
txPipelineReadQueuedMethod · 0.95
cmdsMovedMethod · 0.95
writeCmdsFunction · 0.85
shouldRetryFunction · 0.85
cmdsContainNoRetryFunction · 0.85
setCmdsErrFunction · 0.85
isMovedErrorFunction · 0.85
WithWriterMethod · 0.80
WithReaderMethod · 0.80
pipelineReadCmdsMethod · 0.45

Tested by

no test coverage detected