( ctx context.Context, cmds []Cmder, p pipelineProcessor, operationName string, )
| 1136 | type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error) |
| 1137 | |
| 1138 | func (c *baseClient) generalProcessPipeline( |
| 1139 | ctx context.Context, cmds []Cmder, p pipelineProcessor, operationName string, |
| 1140 | ) error { |
| 1141 | // Only call time.Now() if pipeline operation duration callback is set to avoid overhead |
| 1142 | var operationStart time.Time |
| 1143 | pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback() |
| 1144 | if pipelineOpDurationCallback != nil { |
| 1145 | operationStart = time.Now() |
| 1146 | } |
| 1147 | var lastConn *pool.Conn |
| 1148 | totalAttempts := 0 |
| 1149 | |
| 1150 | var lastErr error |
| 1151 | for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { |
| 1152 | totalAttempts++ |
| 1153 | if attempt > 0 { |
| 1154 | if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
| 1155 | setCmdsErr(cmds, err) |
| 1156 | if pipelineOpDurationCallback != nil { |
| 1157 | operationDuration := time.Since(operationStart) |
| 1158 | pipelineOpDurationCallback(ctx, operationDuration, operationName, len(cmds), totalAttempts, err, lastConn, c.opt.DB) |
| 1159 | } |
| 1160 | return err |
| 1161 | } |
| 1162 | } |
| 1163 | |
| 1164 | // Enable retries by default to retry dial errors returned by withConn. |
| 1165 | canRetry := true |
| 1166 | lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { |
| 1167 | lastConn = cn |
| 1168 | // Process any pending push notifications before executing the pipeline |
| 1169 | if err := c.processPushNotifications(ctx, cn); err != nil { |
| 1170 | internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err) |
| 1171 | } |
| 1172 | var err error |
| 1173 | canRetry, err = p(ctx, cn, cmds) |
| 1174 | return err |
| 1175 | }) |
| 1176 | // Don't retry if any command in the pipeline explicitly disables retries |
| 1177 | // (e.g., RawWriteToCmd which writes directly to an io.Writer and cannot |
| 1178 | // undo partial writes on retry) |
| 1179 | if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) || cmdsContainNoRetry(cmds) { |
| 1180 | // The error should be set here only when failing to obtain the conn. |
| 1181 | if !isRedisError(lastErr) { |
| 1182 | setCmdsErr(cmds, lastErr) |
| 1183 | } |
| 1184 | if pipelineOpDurationCallback != nil { |
| 1185 | operationDuration := time.Since(operationStart) |
| 1186 | pipelineOpDurationCallback(ctx, operationDuration, operationName, len(cmds), totalAttempts, lastErr, lastConn, c.opt.DB) |
| 1187 | } |
| 1188 | |
| 1189 | if lastErr != nil { |
| 1190 | if errorCallback := pool.GetMetricErrorCallback(); errorCallback != nil { |
| 1191 | errorType, statusCode, isInternal := classifyCommandError(lastErr) |
| 1192 | errorCallback(ctx, errorType, lastConn, statusCode, isInternal, totalAttempts-1) |
| 1193 | } |
| 1194 | } |
| 1195 | return lastErr |
no test coverage detected