TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined.
()
| 164 | |
| 165 | // TxPipeline creates a pipeline. Usually it is more convenient to use TxPipelined. |
| 166 | func (c *Tx) TxPipeline() Pipeliner { |
| 167 | pipe := Pipeline{ |
| 168 | exec: func(ctx context.Context, cmds []Cmder) error { |
| 169 | cmds = wrapMultiExec(ctx, cmds) |
| 170 | err := c.processTxPipelineHook(ctx, cmds) |
| 171 | // EXEC discards the watched keys server-side, so the watch is |
| 172 | // cleared only when EXEC actually ran: a nil error (committed), |
| 173 | // TxFailedErr (a watched key changed, EXEC returned nil), or an |
| 174 | // EXECABORT error (a queued command was rejected, so EXEC discarded |
| 175 | // the transaction). All three release the watched keys. Any other |
| 176 | // error may be reported before EXEC executes (for example -LOADING on |
| 177 | // the MULTI reply) or on a broken connection, so leave watchArmed set |
| 178 | // and let Close send UNWATCH rather than risk leaving a watch on a |
| 179 | // pooled connection. |
| 180 | if err == nil || errors.Is(err, TxFailedErr) || IsExecAbortError(err) { |
| 181 | c.watchArmed = false |
| 182 | } |
| 183 | return err |
| 184 | }, |
| 185 | } |
| 186 | pipe.init() |
| 187 | return &pipe |
| 188 | } |
| 189 | |
| 190 | func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder { |
| 191 | if len(cmds) == 0 { |
no test coverage detected