MCPcopy
hub / github.com/redis/go-redis / _process

Method _process

redis.go:958–1013  ·  view source on GitHub ↗
(ctx context.Context, cmd Cmder, attempt int)

Source from the content-addressed store, hash-verified

956}
957
958func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, *pool.Conn, error) {
959 if attempt > 0 {
960 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
961 return false, nil, err
962 }
963 }
964
965 var usedConn *pool.Conn
966 retryTimeout := uint32(0)
967 if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
968 usedConn = cn
969 // Process any pending push notifications before executing the command
970 if err := c.processPushNotifications(ctx, cn); err != nil {
971 internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
972 }
973
974 if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
975 return writeCmd(wr, cmd)
976 }); err != nil {
977 atomic.StoreUint32(&retryTimeout, 1)
978 return err
979 }
980 readReplyFunc := cmd.readReply
981 // Apply unstable RESP3 search module.
982 if c.opt.Protocol != 2 {
983 useRawReply, err := c.assertUnstableCommand(cmd)
984 if err != nil {
985 return err
986 }
987 if useRawReply {
988 readReplyFunc = cmd.readRawReply
989 }
990 }
991 if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
992 // To be sure there are no buffered push notifications, we process them before reading the reply
993 if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
994 internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
995 }
996 return readReplyFunc(rd)
997 }); err != nil {
998 if cmd.readTimeout() == nil {
999 atomic.StoreUint32(&retryTimeout, 1)
1000 } else {
1001 atomic.StoreUint32(&retryTimeout, 0)
1002 }
1003 return err
1004 }
1005
1006 return nil
1007 }); err != nil {
1008 retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
1009 return retry, usedConn, err
1010 }
1011
1012 return false, usedConn, nil
1013}
1014
1015func (c *baseClient) retryBackoff(attempt int) time.Duration {

Callers 1

processMethod · 0.95

Calls 14

retryBackoffMethod · 0.95
withConnMethod · 0.95
contextMethod · 0.95
assertUnstableCommandMethod · 0.95
cmdTimeoutMethod · 0.95
SleepFunction · 0.92
writeCmdFunction · 0.85
shouldRetryFunction · 0.85
WithWriterMethod · 0.80
WithReaderMethod · 0.80

Tested by

no test coverage detected