()
| 243 | } |
| 244 | |
| 245 | func (t *App) initDistributor() (services.Service, error) { |
| 246 | singleBinary := IsSingleBinary(t.cfg.Target) |
| 247 | localPushTargets := distributor.LocalPushTargets{} |
| 248 | var partitionRing ring.PartitionRingReader |
| 249 | |
| 250 | t.cfg.Distributor.PushSpansToKafka = !singleBinary |
| 251 | |
| 252 | if singleBinary { |
| 253 | localPushTargets.Generator = func(ctx context.Context, req *tempopb.PushSpansRequest) (*tempopb.PushResponse, error) { |
| 254 | if t.generator == nil { |
| 255 | return nil, errors.New("metrics-generator not initialized") |
| 256 | } |
| 257 | return t.generator.PushSpans(ctx, req) |
| 258 | } |
| 259 | |
| 260 | localPushTargets.LiveStore = func(ctx context.Context, req *tempopb.PushBytesRequest) (*tempopb.PushResponse, error) { |
| 261 | if t.liveStore == nil { |
| 262 | return nil, errors.New("live-store not initialized") |
| 263 | } |
| 264 | return t.liveStore.PushBytes(ctx, req) |
| 265 | } |
| 266 | } else { |
| 267 | t.cfg.Distributor.KafkaConfig = t.cfg.Ingest.Kafka |
| 268 | partitionRing = t.partitionRing |
| 269 | } |
| 270 | |
| 271 | // todo: make write-path client a module instead of passing the config everywhere |
| 272 | distributor, err := distributor.New(t.cfg.Distributor, |
| 273 | localPushTargets, |
| 274 | partitionRing, |
| 275 | t.Overrides, |
| 276 | t.TracesConsumerMiddleware, |
| 277 | log.Logger, t.cfg.Server.LogLevel, prometheus.DefaultRegisterer) |
| 278 | if err != nil { |
| 279 | return nil, fmt.Errorf("failed to create distributor: %w", err) |
| 280 | } |
| 281 | t.distributor = distributor |
| 282 | |
| 283 | if distributor.DistributorRing != nil { |
| 284 | t.Server.HTTPRouter().Handle("/distributor/ring", distributor.DistributorRing) |
| 285 | } |
| 286 | |
| 287 | if usageHandler := distributor.UsageTrackerHandler(); usageHandler != nil { |
| 288 | t.Server.HTTPRouter().Handle("/usage_metrics", usageHandler) |
| 289 | } |
| 290 | |
| 291 | return t.distributor, nil |
| 292 | } |
| 293 | |
| 294 | func (t *App) initGenerator() (services.Service, error) { |
| 295 | t.configureGenerator() |
nothing calls this directly
no test coverage detected