MCPcopy
hub / github.com/grafana/dskit / updatePartitionRing

Method updatePartitionRing

ring/partition_ring_watcher.go:109–142  ·  view source on GitHub ↗
(desc *PartitionRingDesc)

Source from the content-addressed store, hash-verified

107}
108
109func (w *PartitionRingWatcher) updatePartitionRing(desc *PartitionRingDesc) error {
110 newRing, err := NewPartitionRingWithOptions(*desc, w.opts)
111 if err != nil {
112 return errors.Wrap(err, "failed to create partition ring from descriptor")
113 }
114 w.ringMx.Lock()
115 oldRing := w.ring
116 w.ring = newRing
117 w.ringMx.Unlock()
118
119 if w.delegate != nil {
120 w.delegate.OnPartitionRingChanged(&oldRing.desc, desc)
121 }
122
123 // Update metrics.
124 for state, count := range desc.countPartitionsByState() {
125 w.numPartitionsGaugeVec.WithLabelValues(state.CleanName()).Set(float64(count))
126 }
127
128 // Check partitions whose state change lock status has changed and log them.
129 for partitionID, partition := range desc.Partitions {
130 state := partition.GetState().CleanName()
131
132 oldPartition, existedBefore := oldRing.desc.Partitions[partitionID]
133 if !existedBefore || partition.StateChangeLocked != oldPartition.StateChangeLocked {
134 if partition.StateChangeLocked {
135 level.Warn(w.logger).Log("msg", "partition state change is locked", "partition_id", partitionID, "partition_state", state)
136 } else if existedBefore {
137 level.Info(w.logger).Log("msg", "partition state change is unlocked", "partition_id", partitionID, "partition_state", state)
138 }
139 }
140 }
141 return nil
142}
143
144// PartitionRing returns the most updated snapshot of the PartitionRing. The returned instance
145// is immutable and will not be updated if new changes are done to the ring.

Callers 2

startingMethod · 0.95
loopMethod · 0.95

Calls 8

CleanNameMethod · 0.80
WrapMethod · 0.65
SetMethod · 0.65
GetStateMethod · 0.45
LogMethod · 0.45

Tested by

no test coverage detected