| 201 | } |
| 202 | |
| 203 | func (i *instance) updateProcessors() error { |
| 204 | desiredProcessors := i.filterSupportedProcessors(i.overrides.MetricsGeneratorProcessors(i.instanceID)) |
| 205 | desiredCfg, err := i.cfg.Processor.copyWithOverrides(i.overrides, i.instanceID) |
| 206 | if err != nil { |
| 207 | return err |
| 208 | } |
| 209 | |
| 210 | ingestionSlackInt := i.overrides.MetricsGeneratorIngestionSlack(i.instanceID).Nanoseconds() |
| 211 | if ingestionSlackInt == 0 { |
| 212 | ingestionSlackInt = i.cfg.MetricsIngestionSlack.Nanoseconds() |
| 213 | } |
| 214 | |
| 215 | i.ingestionSlackOverride.Store(ingestionSlackInt) |
| 216 | |
| 217 | desiredProcessors, desiredCfg = i.updateSubprocessors(desiredProcessors, desiredCfg) |
| 218 | |
| 219 | i.processorsMtx.RLock() |
| 220 | toAdd, toRemove, toReplace, err := i.diffProcessors(desiredProcessors, desiredCfg) |
| 221 | i.processorsMtx.RUnlock() |
| 222 | |
| 223 | if err != nil { |
| 224 | return err |
| 225 | } |
| 226 | if len(toAdd) == 0 && len(toRemove) == 0 && len(toReplace) == 0 { |
| 227 | return nil |
| 228 | } |
| 229 | |
| 230 | i.processorsMtx.Lock() |
| 231 | defer i.processorsMtx.Unlock() |
| 232 | |
| 233 | for _, processorName := range toAdd { |
| 234 | err := i.addProcessor(processorName, desiredCfg) |
| 235 | if err != nil { |
| 236 | return err |
| 237 | } |
| 238 | } |
| 239 | for _, processorName := range toRemove { |
| 240 | i.removeProcessor(processorName) |
| 241 | } |
| 242 | for _, processorName := range toReplace { |
| 243 | i.removeProcessor(processorName) |
| 244 | |
| 245 | err := i.addProcessor(processorName, desiredCfg) |
| 246 | if err != nil { |
| 247 | return err |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | i.updateProcessorMetrics() |
| 252 | |
| 253 | return nil |
| 254 | } |
| 255 | |
| 256 | func (i *instance) filterSupportedProcessors(processors map[string]struct{}) map[string]struct{} { |
| 257 | filtered := make(map[string]struct{}, len(processors)) |