| 93 | } |
| 94 | |
| 95 | func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverrides, wal storage.Storage, logger log.Logger) (*instance, error) { |
| 96 | logger = log.With(logger, "tenant", instanceID) |
| 97 | |
| 98 | limitLogger := tempo_log.NewRateLimitedLogger(1, level.Warn(logger)) |
| 99 | var limiter registry.Limiter |
| 100 | switch cfg.LimiterType { |
| 101 | case LimiterTypeSeries: |
| 102 | limiter = localserieslimiter.New(overrides.MetricsGeneratorMaxActiveSeries, instanceID, limitLogger) |
| 103 | case LimiterTypeEntity: |
| 104 | limiter = localentitylimiter.New(overrides.MetricsGeneratorMaxActiveEntities, instanceID, limitLogger) |
| 105 | default: |
| 106 | return nil, fmt.Errorf("invalid limiter type: %s", cfg.LimiterType) |
| 107 | } |
| 108 | |
| 109 | i := &instance{ |
| 110 | cfg: cfg, |
| 111 | instanceID: instanceID, |
| 112 | overrides: overrides, |
| 113 | |
| 114 | registry: registry.New(&cfg.Registry, overrides, instanceID, wal, logger, limiter), |
| 115 | wal: wal, |
| 116 | |
| 117 | processors: make(map[string]processor.Processor), |
| 118 | |
| 119 | shutdownCh: make(chan struct{}, 1), |
| 120 | |
| 121 | logger: logger, |
| 122 | } |
| 123 | |
| 124 | err := i.updateProcessors() |
| 125 | if err != nil { |
| 126 | return nil, fmt.Errorf("could not initialize processors: %w", err) |
| 127 | } |
| 128 | go i.watchOverrides() |
| 129 | |
| 130 | return i, nil |
| 131 | } |
| 132 | |
| 133 | func (i *instance) watchOverrides() { |
| 134 | reloadPeriod := 10 * time.Second |