MCPcopy
hub / github.com/grafana/dskit / mergeWithTime

Method mergeWithTime

ring/model.go:226–303  ·  view source on GitHub ↗
(mergeable memberlist.Mergeable, localCAS bool, now time.Time)

Source from the content-addressed store, hash-verified

224}
225
226func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now time.Time) (memberlist.Mergeable, error) {
227 if mergeable == nil {
228 return nil, nil
229 }
230
231 other, ok := mergeable.(*Desc)
232 if !ok {
233 return nil, fmt.Errorf("expected *ring.Desc, got %T", mergeable)
234 }
235
236 if other == nil {
237 return nil, nil
238 }
239
240 normalizeIngestersMap(other)
241
242 thisIngesterMap := d.Ingesters
243 otherIngesterMap := other.Ingesters
244
245 var updated []string
246 tokensChanged := false
247
248 for name, oing := range otherIngesterMap {
249 ting := thisIngesterMap[name]
250 // ting.Timestamp will be 0, if there was no such ingester in our version
251 if oing.Timestamp > ting.Timestamp {
252 if !tokensEqual(ting.Tokens, oing.Tokens) {
253 tokensChanged = true
254 }
255 oing.Tokens = append([]uint32(nil), oing.Tokens...) // make a copy of tokens
256 thisIngesterMap[name] = oing
257 updated = append(updated, name)
258 } else if oing.Timestamp == ting.Timestamp && ting.State != LEFT && oing.State == LEFT {
259 // we accept LEFT even if timestamp hasn't changed
260 thisIngesterMap[name] = oing // has no tokens already
261 updated = append(updated, name)
262 }
263 }
264
265 if localCAS {
266 // This breaks commutativity! But we only do it locally, not when gossiping with others.
267 for name, ting := range thisIngesterMap {
268 if _, ok := otherIngesterMap[name]; !ok && ting.State != LEFT {
269 // missing, let's mark our ingester as LEFT
270 ting.State = LEFT
271 ting.Tokens = nil
272 // We are deleting entry "now", and should not keep old timestamp, because there may already be pending
273 // message in the gossip network with newer timestamp (but still older than "now").
274 // Such message would "resurrect" this deleted entry.
275 ting.Timestamp = now.Unix()
276 thisIngesterMap[name] = ting
277
278 updated = append(updated, name)
279 }
280 }
281 }
282
283 // No updated ingesters

Callers 4

MergeMethod · 0.95
mergeLocalCASFunction · 0.45

Calls 6

normalizeIngestersMapFunction · 0.85
tokensEqualFunction · 0.85
conflictingTokensExistFunction · 0.85
resolveConflictsFunction · 0.85
NewDescFunction · 0.85
ErrorfMethod · 0.80