New a distributor creates.
( cfg Config, localPushTargets LocalPushTargets, partitionRing ring.PartitionRingReader, o overrides.Interface, middleware receiver.Middleware, logger log.Logger, loggingLevel dslog.Level, reg prometheus.Registerer, )
| 215 | |
| 216 | // New a distributor creates. |
| 217 | func New( |
| 218 | cfg Config, |
| 219 | localPushTargets LocalPushTargets, |
| 220 | partitionRing ring.PartitionRingReader, |
| 221 | o overrides.Interface, |
| 222 | middleware receiver.Middleware, |
| 223 | logger log.Logger, |
| 224 | loggingLevel dslog.Level, |
| 225 | reg prometheus.Registerer, |
| 226 | ) (*Distributor, error) { |
| 227 | if err := cfg.Validate(); err != nil { |
| 228 | return nil, err |
| 229 | } |
| 230 | |
| 231 | subservices := []services.Service(nil) |
| 232 | |
| 233 | // Create the configured ingestion rate limit strategy (local or global). |
| 234 | var ingestionRateStrategy limiter.RateLimiterStrategy |
| 235 | var distributorRing *ring.Ring |
| 236 | |
| 237 | if o.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy { |
| 238 | lifecyclerCfg := cfg.DistributorRing.ToLifecyclerConfig() |
| 239 | lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) |
| 240 | if err != nil { |
| 241 | return nil, err |
| 242 | } |
| 243 | subservices = append(subservices, lifecycler) |
| 244 | ingestionRateStrategy = newGlobalIngestionRateStrategy(o, lifecycler) |
| 245 | |
| 246 | ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) |
| 247 | if err != nil { |
| 248 | return nil, fmt.Errorf("unable to initialize distributor ring: %w", err) |
| 249 | } |
| 250 | distributorRing = ring |
| 251 | subservices = append(subservices, distributorRing) |
| 252 | } else { |
| 253 | ingestionRateStrategy = newLocalIngestionRateStrategy(o) |
| 254 | } |
| 255 | |
| 256 | pushSpansToKafka := cfg.PushSpansToKafka |
| 257 | |
| 258 | d := &Distributor{ |
| 259 | cfg: cfg, |
| 260 | DistributorRing: distributorRing, |
| 261 | ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), |
| 262 | localPushTargets: localPushTargets, |
| 263 | partitionRing: partitionRing, |
| 264 | pushSpansToKafka: pushSpansToKafka, |
| 265 | overrides: o, |
| 266 | tracePushMiddlewares: cfg.TracePushMiddlewares, |
| 267 | truncationLogger: tempo_log.NewRateLimitedLogger(truncationLogsPerSecond, level.Warn(logger)), |
| 268 | logger: logger, |
| 269 | sleep: time.Sleep, |
| 270 | now: time.Now, |
| 271 | } |
| 272 | |
| 273 | if cfg.Usage.CostAttribution.Enabled { |
| 274 | tracker, err := usage.NewTracker(cfg.Usage.CostAttribution, "cost-attribution", o.CostAttributionDimensions, o.CostAttributionMaxCardinality, logger) |