(ctx context.Context, cmd Cmder, attempt int)
| 956 | } |
| 957 | |
| 958 | func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, *pool.Conn, error) { |
| 959 | if attempt > 0 { |
| 960 | if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
| 961 | return false, nil, err |
| 962 | } |
| 963 | } |
| 964 | |
| 965 | var usedConn *pool.Conn |
| 966 | retryTimeout := uint32(0) |
| 967 | if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { |
| 968 | usedConn = cn |
| 969 | // Process any pending push notifications before executing the command |
| 970 | if err := c.processPushNotifications(ctx, cn); err != nil { |
| 971 | internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) |
| 972 | } |
| 973 | |
| 974 | if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { |
| 975 | return writeCmd(wr, cmd) |
| 976 | }); err != nil { |
| 977 | atomic.StoreUint32(&retryTimeout, 1) |
| 978 | return err |
| 979 | } |
| 980 | readReplyFunc := cmd.readReply |
| 981 | // Apply unstable RESP3 search module. |
| 982 | if c.opt.Protocol != 2 { |
| 983 | useRawReply, err := c.assertUnstableCommand(cmd) |
| 984 | if err != nil { |
| 985 | return err |
| 986 | } |
| 987 | if useRawReply { |
| 988 | readReplyFunc = cmd.readRawReply |
| 989 | } |
| 990 | } |
| 991 | if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { |
| 992 | // To be sure there are no buffered push notifications, we process them before reading the reply |
| 993 | if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { |
| 994 | internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) |
| 995 | } |
| 996 | return readReplyFunc(rd) |
| 997 | }); err != nil { |
| 998 | if cmd.readTimeout() == nil { |
| 999 | atomic.StoreUint32(&retryTimeout, 1) |
| 1000 | } else { |
| 1001 | atomic.StoreUint32(&retryTimeout, 0) |
| 1002 | } |
| 1003 | return err |
| 1004 | } |
| 1005 | |
| 1006 | return nil |
| 1007 | }); err != nil { |
| 1008 | retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) |
| 1009 | return retry, usedConn, err |
| 1010 | } |
| 1011 | |
| 1012 | return false, usedConn, nil |
| 1013 | } |
| 1014 | |
| 1015 | func (c *baseClient) retryBackoff(attempt int) time.Duration { |
no test coverage detected