( ctx context.Context, cn *pool.Conn, cmds []Cmder, )
| 1252 | } |
| 1253 | |
| 1254 | func (c *baseClient) txPipelineProcessCmds( |
| 1255 | ctx context.Context, cn *pool.Conn, cmds []Cmder, |
| 1256 | ) (bool, error) { |
| 1257 | // Process any pending push notifications before executing the transaction pipeline |
| 1258 | if err := c.processPushNotifications(ctx, cn); err != nil { |
| 1259 | internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err) |
| 1260 | } |
| 1261 | |
| 1262 | if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { |
| 1263 | return writeCmds(wr, cmds) |
| 1264 | }); err != nil { |
| 1265 | setCmdsErr(cmds, err) |
| 1266 | return true, err |
| 1267 | } |
| 1268 | |
| 1269 | if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { |
| 1270 | statusCmd := cmds[0].(*StatusCmd) |
| 1271 | // Trim multi and exec. |
| 1272 | trimmedCmds := cmds[1 : len(cmds)-1] |
| 1273 | |
| 1274 | if err := c.txPipelineReadQueued(ctx, cn, rd, statusCmd, trimmedCmds); err != nil { |
| 1275 | setCmdsErr(cmds, err) |
| 1276 | return err |
| 1277 | } |
| 1278 | |
| 1279 | // Read replies. |
| 1280 | return c.pipelineReadCmds(ctx, cn, rd, trimmedCmds) |
| 1281 | }); err != nil { |
| 1282 | return false, err |
| 1283 | } |
| 1284 | |
| 1285 | return false, nil |
| 1286 | } |
| 1287 | |
| 1288 | // txPipelineReadQueued reads queued replies from the Redis server. |
| 1289 | // It returns an error if the server returns an error or if the number of replies does not match the number of commands. |
nothing calls this directly
no test coverage detected