MCPcopy
hub / github.com/redis/go-redis / Watch

Method Watch

osscluster.go:2117–2182  ·  view source on GitHub ↗
(ctx context.Context, fn func(*Tx) error, keys ...string)

Source from the content-addressed store, hash-verified

2115}
2116
2117func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
2118 if len(keys) == 0 {
2119 return errNoWatchKeys
2120 }
2121
2122 slot := hashtag.Slot(keys[0])
2123 for _, key := range keys[1:] {
2124 if hashtag.Slot(key) != slot {
2125 return errWatchCrosslot
2126 }
2127 }
2128
2129 node, err := c.slotMasterNode(ctx, slot)
2130 if err != nil {
2131 return err
2132 }
2133
2134 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
2135 if attempt > 0 {
2136 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
2137 return err
2138 }
2139 }
2140
2141 // Track callback errors separately to avoid retrying user failures through cluster retry classification.
2142 var fnErr error
2143 err = node.Client.Watch(ctx, func(tx *Tx) error {
2144 fnErr = fn(tx)
2145 return fnErr
2146 }, keys...)
2147 if err == nil {
2148 break
2149 }
2150 if fnErr != nil {
2151 return fnErr
2152 }
2153
2154 moved, ask, addr := isMovedError(err)
2155 if moved || ask {
2156 node, err = c.nodes.GetOrCreate(addr)
2157 if err != nil {
2158 return err
2159 }
2160 continue
2161 }
2162
2163 if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
2164 if isReadOnly {
2165 c.state.LazyReload()
2166 }
2167 node, err = c.slotMasterNode(ctx, slot)
2168 if err != nil {
2169 return err
2170 }
2171 continue
2172 }
2173
2174 if shouldRetry(err, true) {

Callers

nothing calls this directly

Calls 10

slotMasterNodeMethod · 0.95
retryBackoffMethod · 0.95
SlotFunction · 0.92
SleepFunction · 0.92
isMovedErrorFunction · 0.85
isReadOnlyErrorFunction · 0.85
shouldRetryFunction · 0.85
GetOrCreateMethod · 0.80
LazyReloadMethod · 0.80
WatchMethod · 0.65

Tested by

no test coverage detected