(desc *PartitionRingDesc)
| 107 | } |
| 108 | |
| 109 | func (w *PartitionRingWatcher) updatePartitionRing(desc *PartitionRingDesc) error { |
| 110 | newRing, err := NewPartitionRingWithOptions(*desc, w.opts) |
| 111 | if err != nil { |
| 112 | return errors.Wrap(err, "failed to create partition ring from descriptor") |
| 113 | } |
| 114 | w.ringMx.Lock() |
| 115 | oldRing := w.ring |
| 116 | w.ring = newRing |
| 117 | w.ringMx.Unlock() |
| 118 | |
| 119 | if w.delegate != nil { |
| 120 | w.delegate.OnPartitionRingChanged(&oldRing.desc, desc) |
| 121 | } |
| 122 | |
| 123 | // Update metrics. |
| 124 | for state, count := range desc.countPartitionsByState() { |
| 125 | w.numPartitionsGaugeVec.WithLabelValues(state.CleanName()).Set(float64(count)) |
| 126 | } |
| 127 | |
| 128 | // Check partitions whose state change lock status has changed and log them. |
| 129 | for partitionID, partition := range desc.Partitions { |
| 130 | state := partition.GetState().CleanName() |
| 131 | |
| 132 | oldPartition, existedBefore := oldRing.desc.Partitions[partitionID] |
| 133 | if !existedBefore || partition.StateChangeLocked != oldPartition.StateChangeLocked { |
| 134 | if partition.StateChangeLocked { |
| 135 | level.Warn(w.logger).Log("msg", "partition state change is locked", "partition_id", partitionID, "partition_state", state) |
| 136 | } else if existedBefore { |
| 137 | level.Info(w.logger).Log("msg", "partition state change is unlocked", "partition_id", partitionID, "partition_state", state) |
| 138 | } |
| 139 | } |
| 140 | } |
| 141 | return nil |
| 142 | } |
| 143 | |
| 144 | // PartitionRing returns the most updated snapshot of the PartitionRing. The returned instance |
| 145 | // is immutable and will not be updated if new changes are done to the ring. |
no test coverage detected