MCPcopy
hub / github.com/redis/go-redis / generalProcessPipeline

Method generalProcessPipeline

redis.go:1138–1210  ·  view source on GitHub ↗
(
	ctx context.Context, cmds []Cmder, p pipelineProcessor, operationName string,
)

Source from the content-addressed store, hash-verified

1136type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
1137
1138func (c *baseClient) generalProcessPipeline(
1139 ctx context.Context, cmds []Cmder, p pipelineProcessor, operationName string,
1140) error {
1141 // Only call time.Now() if pipeline operation duration callback is set to avoid overhead
1142 var operationStart time.Time
1143 pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback()
1144 if pipelineOpDurationCallback != nil {
1145 operationStart = time.Now()
1146 }
1147 var lastConn *pool.Conn
1148 totalAttempts := 0
1149
1150 var lastErr error
1151 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
1152 totalAttempts++
1153 if attempt > 0 {
1154 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1155 setCmdsErr(cmds, err)
1156 if pipelineOpDurationCallback != nil {
1157 operationDuration := time.Since(operationStart)
1158 pipelineOpDurationCallback(ctx, operationDuration, operationName, len(cmds), totalAttempts, err, lastConn, c.opt.DB)
1159 }
1160 return err
1161 }
1162 }
1163
1164 // Enable retries by default to retry dial errors returned by withConn.
1165 canRetry := true
1166 lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1167 lastConn = cn
1168 // Process any pending push notifications before executing the pipeline
1169 if err := c.processPushNotifications(ctx, cn); err != nil {
1170 internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err)
1171 }
1172 var err error
1173 canRetry, err = p(ctx, cn, cmds)
1174 return err
1175 })
1176 // Don't retry if any command in the pipeline explicitly disables retries
1177 // (e.g., RawWriteToCmd which writes directly to an io.Writer and cannot
1178 // undo partial writes on retry)
1179 if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) || cmdsContainNoRetry(cmds) {
1180 // The error should be set here only when failing to obtain the conn.
1181 if !isRedisError(lastErr) {
1182 setCmdsErr(cmds, lastErr)
1183 }
1184 if pipelineOpDurationCallback != nil {
1185 operationDuration := time.Since(operationStart)
1186 pipelineOpDurationCallback(ctx, operationDuration, operationName, len(cmds), totalAttempts, lastErr, lastConn, c.opt.DB)
1187 }
1188
1189 if lastErr != nil {
1190 if errorCallback := pool.GetMetricErrorCallback(); errorCallback != nil {
1191 errorType, statusCode, isInternal := classifyCommandError(lastErr)
1192 errorCallback(ctx, errorType, lastConn, statusCode, isInternal, totalAttempts-1)
1193 }
1194 }
1195 return lastErr

Callers 2

processPipelineMethod · 0.95
processTxPipelineMethod · 0.95

Calls 12

retryBackoffMethod · 0.95
withConnMethod · 0.95
SleepFunction · 0.92
GetMetricErrorCallbackFunction · 0.92
setCmdsErrFunction · 0.85
shouldRetryFunction · 0.85
cmdsContainNoRetryFunction · 0.85
isRedisErrorFunction · 0.85
classifyCommandErrorFunction · 0.85
PrintfMethod · 0.65

Tested by

no test coverage detected