(ctx context.Context, fn func(*Tx) error, keys ...string)
| 2115 | } |
| 2116 | |
| 2117 | func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { |
| 2118 | if len(keys) == 0 { |
| 2119 | return errNoWatchKeys |
| 2120 | } |
| 2121 | |
| 2122 | slot := hashtag.Slot(keys[0]) |
| 2123 | for _, key := range keys[1:] { |
| 2124 | if hashtag.Slot(key) != slot { |
| 2125 | return errWatchCrosslot |
| 2126 | } |
| 2127 | } |
| 2128 | |
| 2129 | node, err := c.slotMasterNode(ctx, slot) |
| 2130 | if err != nil { |
| 2131 | return err |
| 2132 | } |
| 2133 | |
| 2134 | for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { |
| 2135 | if attempt > 0 { |
| 2136 | if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
| 2137 | return err |
| 2138 | } |
| 2139 | } |
| 2140 | |
| 2141 | // Track callback errors separately to avoid retrying user failures through cluster retry classification. |
| 2142 | var fnErr error |
| 2143 | err = node.Client.Watch(ctx, func(tx *Tx) error { |
| 2144 | fnErr = fn(tx) |
| 2145 | return fnErr |
| 2146 | }, keys...) |
| 2147 | if err == nil { |
| 2148 | break |
| 2149 | } |
| 2150 | if fnErr != nil { |
| 2151 | return fnErr |
| 2152 | } |
| 2153 | |
| 2154 | moved, ask, addr := isMovedError(err) |
| 2155 | if moved || ask { |
| 2156 | node, err = c.nodes.GetOrCreate(addr) |
| 2157 | if err != nil { |
| 2158 | return err |
| 2159 | } |
| 2160 | continue |
| 2161 | } |
| 2162 | |
| 2163 | if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed { |
| 2164 | if isReadOnly { |
| 2165 | c.state.LazyReload() |
| 2166 | } |
| 2167 | node, err = c.slotMasterNode(ctx, slot) |
| 2168 | if err != nil { |
| 2169 | return err |
| 2170 | } |
| 2171 | continue |
| 2172 | } |
| 2173 | |
| 2174 | if shouldRetry(err, true) { |
nothing calls this directly
no test coverage detected