| 224 | } |
| 225 | |
| 226 | func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now time.Time) (memberlist.Mergeable, error) { |
| 227 | if mergeable == nil { |
| 228 | return nil, nil |
| 229 | } |
| 230 | |
| 231 | other, ok := mergeable.(*Desc) |
| 232 | if !ok { |
| 233 | return nil, fmt.Errorf("expected *ring.Desc, got %T", mergeable) |
| 234 | } |
| 235 | |
| 236 | if other == nil { |
| 237 | return nil, nil |
| 238 | } |
| 239 | |
| 240 | normalizeIngestersMap(other) |
| 241 | |
| 242 | thisIngesterMap := d.Ingesters |
| 243 | otherIngesterMap := other.Ingesters |
| 244 | |
| 245 | var updated []string |
| 246 | tokensChanged := false |
| 247 | |
| 248 | for name, oing := range otherIngesterMap { |
| 249 | ting := thisIngesterMap[name] |
| 250 | // ting.Timestamp will be 0, if there was no such ingester in our version |
| 251 | if oing.Timestamp > ting.Timestamp { |
| 252 | if !tokensEqual(ting.Tokens, oing.Tokens) { |
| 253 | tokensChanged = true |
| 254 | } |
| 255 | oing.Tokens = append([]uint32(nil), oing.Tokens...) // make a copy of tokens |
| 256 | thisIngesterMap[name] = oing |
| 257 | updated = append(updated, name) |
| 258 | } else if oing.Timestamp == ting.Timestamp && ting.State != LEFT && oing.State == LEFT { |
| 259 | // we accept LEFT even if timestamp hasn't changed |
| 260 | thisIngesterMap[name] = oing // has no tokens already |
| 261 | updated = append(updated, name) |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | if localCAS { |
| 266 | // This breaks commutativity! But we only do it locally, not when gossiping with others. |
| 267 | for name, ting := range thisIngesterMap { |
| 268 | if _, ok := otherIngesterMap[name]; !ok && ting.State != LEFT { |
| 269 | // missing, let's mark our ingester as LEFT |
| 270 | ting.State = LEFT |
| 271 | ting.Tokens = nil |
| 272 | // We are deleting entry "now", and should not keep old timestamp, because there may already be pending |
| 273 | // message in the gossip network with newer timestamp (but still older than "now"). |
| 274 | // Such message would "resurrect" this deleted entry. |
| 275 | ting.Timestamp = now.Unix() |
| 276 | thisIngesterMap[name] = ting |
| 277 | |
| 278 | updated = append(updated, name) |
| 279 | } |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | // No updated ingesters |