MCPcopy
hub / github.com/grafana/dskit / mergeWithTime

Method mergeWithTime

ring/partition_ring_model.go:338–441  ·  view source on GitHub ↗
(mergeable memberlist.Mergeable, localCAS bool, now time.Time)

Source from the content-addressed store, hash-verified

336}
337
338func (m *PartitionRingDesc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now time.Time) (memberlist.Mergeable, error) {
339 if mergeable == nil {
340 return nil, nil
341 }
342
343 other, ok := mergeable.(*PartitionRingDesc)
344 if !ok {
345 return nil, fmt.Errorf("expected *PartitionRingDesc, got %T", mergeable)
346 }
347
348 if other == nil {
349 return nil, nil
350 }
351
352 change := NewPartitionRingDesc()
353
354 // Handle partitions.
355 for id, otherPart := range other.Partitions {
356 changed := false
357
358 thisPart, exists := m.Partitions[id]
359 if !exists {
360 changed = true
361 thisPart = otherPart
362 } else {
363 // We don't merge changes to partition ID and tokens because we expect them to be immutable.
364 //
365 // If in the future we'll change the tokens generation algorithm and we'll have to handle migration to
366 // a different set of tokens then we'll add the support. For example, we could add "token generation version"
367 // to PartitionDesc and then preserve tokens generated by latest version only, or a timestamp for tokens
368 // update too.
369
370 // In case the timestamp is equal we give priority to the deleted state.
371 // Reason is that timestamp has second precision, so we cover the case an
372 // update and subsequent deletion occur within the same second.
373 if otherPart.StateTimestamp > thisPart.StateTimestamp || (otherPart.StateTimestamp == thisPart.StateTimestamp && otherPart.State == PartitionDeleted && thisPart.State != PartitionDeleted) {
374 changed = true
375
376 thisPart.State = otherPart.State
377 thisPart.StateTimestamp = otherPart.StateTimestamp
378 }
379
380 if otherPart.StateChangeLockedTimestamp > thisPart.StateChangeLockedTimestamp {
381 changed = true
382
383 thisPart.StateChangeLocked = otherPart.StateChangeLocked
384 thisPart.StateChangeLockedTimestamp = otherPart.StateChangeLockedTimestamp
385 }
386 }
387
388 if changed {
389 m.Partitions[id] = thisPart
390 change.Partitions[id] = thisPart
391 }
392 }
393
394 if localCAS {
395 // Let's mark all missing partitions in incoming change as deleted.

Callers 1

MergeMethod · 0.95

Calls 2

NewPartitionRingDescFunction · 0.85
ErrorfMethod · 0.80

Tested by

no test coverage detected