MCPcopy
hub / github.com/grafana/tempo / New

Function New

modules/distributor/distributor.go:217–322  ·  view source on GitHub ↗

New a distributor creates.

(
	cfg Config,
	localPushTargets LocalPushTargets,
	partitionRing ring.PartitionRingReader,
	o overrides.Interface,
	middleware receiver.Middleware,
	logger log.Logger,
	loggingLevel dslog.Level,
	reg prometheus.Registerer,
)

Source from the content-addressed store, hash-verified

215
216// New a distributor creates.
217func New(
218 cfg Config,
219 localPushTargets LocalPushTargets,
220 partitionRing ring.PartitionRingReader,
221 o overrides.Interface,
222 middleware receiver.Middleware,
223 logger log.Logger,
224 loggingLevel dslog.Level,
225 reg prometheus.Registerer,
226) (*Distributor, error) {
227 if err := cfg.Validate(); err != nil {
228 return nil, err
229 }
230
231 subservices := []services.Service(nil)
232
233 // Create the configured ingestion rate limit strategy (local or global).
234 var ingestionRateStrategy limiter.RateLimiterStrategy
235 var distributorRing *ring.Ring
236
237 if o.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy {
238 lifecyclerCfg := cfg.DistributorRing.ToLifecyclerConfig()
239 lifecycler, err := ring.NewLifecycler(lifecyclerCfg, nil, "distributor", cfg.OverrideRingKey, false, logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
240 if err != nil {
241 return nil, err
242 }
243 subservices = append(subservices, lifecycler)
244 ingestionRateStrategy = newGlobalIngestionRateStrategy(o, lifecycler)
245
246 ring, err := ring.New(lifecyclerCfg.RingConfig, "distributor", cfg.OverrideRingKey, logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
247 if err != nil {
248 return nil, fmt.Errorf("unable to initialize distributor ring: %w", err)
249 }
250 distributorRing = ring
251 subservices = append(subservices, distributorRing)
252 } else {
253 ingestionRateStrategy = newLocalIngestionRateStrategy(o)
254 }
255
256 pushSpansToKafka := cfg.PushSpansToKafka
257
258 d := &Distributor{
259 cfg: cfg,
260 DistributorRing: distributorRing,
261 ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
262 localPushTargets: localPushTargets,
263 partitionRing: partitionRing,
264 pushSpansToKafka: pushSpansToKafka,
265 overrides: o,
266 tracePushMiddlewares: cfg.TracePushMiddlewares,
267 truncationLogger: tempo_log.NewRateLimitedLogger(truncationLogsPerSecond, level.Warn(logger)),
268 logger: logger,
269 sleep: time.Sleep,
270 now: time.Now,
271 }
272
273 if cfg.Usage.CostAttribution.Enabled {
274 tracker, err := usage.NewTracker(cfg.Usage.CostAttribution, "cost-attribution", o.CostAttributionDimensions, o.CostAttributionMaxCardinality, logger)

Calls 11

NewTrackerFunction · 0.92
NewManagerFunction · 0.92
NewFunction · 0.92
NewWriterClientFunction · 0.92
NewProducerFunction · 0.92
newGeneratorForwarderFunction · 0.85
ValidateMethod · 0.65
IngestionRateStrategyMethod · 0.65
ToLifecyclerConfigMethod · 0.45