| 698 | } |
| 699 | |
| 700 | func (t *App) initLiveStore() (services.Service, error) { |
| 701 | // In single-binary mode traces are pushed in-process from distributor, |
| 702 | // so live-store does not consume directly from Kafka. |
| 703 | t.cfg.LiveStore.ConsumeFromKafka = !IsSingleBinary(t.cfg.Target) |
| 704 | |
| 705 | // Inject config from other locations. |
| 706 | t.cfg.LiveStore.IngestConfig = t.cfg.Ingest |
| 707 | t.cfg.LiveStore.Ring.ListenPort = t.cfg.Server.GRPCListenPort |
| 708 | // Block config and WAL version are always sourced from storage.trace.block. |
| 709 | t.cfg.LiveStore.BlockConfig = *t.cfg.StorageConfig.Trace.Block |
| 710 | t.cfg.LiveStore.WAL.Version = t.cfg.StorageConfig.Trace.Block.Version |
| 711 | |
| 712 | var err error |
| 713 | t.liveStore, err = livestore.New(t.cfg.LiveStore, t.Overrides, t.store, log.Logger, prometheus.DefaultRegisterer) |
| 714 | if err != nil { |
| 715 | return nil, fmt.Errorf("failed to create liveStore: %w", err) |
| 716 | } |
| 717 | |
| 718 | tempopb.RegisterQuerierServer(t.Server.GRPC(), t.liveStore) |
| 719 | tempopb.RegisterMetricsServer(t.Server.GRPC(), t.liveStore) |
| 720 | |
| 721 | t.Server.HTTPRouter().Methods(http.MethodGet, http.MethodPost, http.MethodDelete). |
| 722 | Path("/live-store/prepare-partition-downscale"). |
| 723 | Handler(http.HandlerFunc(t.liveStore.PreparePartitionDownscaleHandler)) |
| 724 | t.Server.HTTPRouter().Methods(http.MethodGet, http.MethodPost, http.MethodDelete). |
| 725 | Path("/live-store/prepare-downscale"). |
| 726 | Handler(http.HandlerFunc(t.liveStore.PrepareDownscaleHandler)) |
| 727 | |
| 728 | return t.liveStore, nil |
| 729 | } |
| 730 | |
| 731 | func (t *App) setupModuleManager() error { |
| 732 | mm := modules.NewManager(log.Logger) |