MCPcopy
hub / github.com/grafana/tempo / New

Function New

modules/livestore/live_store.go:167–270  ·  view source on GitHub ↗
(cfg Config, overridesService overrides.Interface, completeBlockFlusher completeBlockFlusher, logger log.Logger, reg prometheus.Registerer)

Source from the content-addressed store, hash-verified

165}
166
167func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher completeBlockFlusher, logger log.Logger, reg prometheus.Registerer) (*LiveStore, error) {
168 completeBlockEncoding, encErr := encoding.FromVersionForWrites(cfg.BlockConfig.Version)
169 if encErr != nil {
170 return nil, fmt.Errorf("block version validation failed: %w", encErr)
171 }
172
173 completeBlockLifecycle, lifecycleErr := newCompleteBlockLifecycle(cfg, completeBlockFlusher, logger)
174 if lifecycleErr != nil {
175 return nil, fmt.Errorf("create complete block lifecycle: %w", lifecycleErr)
176 }
177
178 ctx, cancel := context.WithCancel(context.Background())
179
180 s := &LiveStore{
181 cfg: cfg,
182 logger: logger,
183 reg: reg,
184 decoder: ingest.NewDecoder(),
185 completeBlockEncoding: completeBlockEncoding,
186 ctx: ctx,
187 cancel: cancel,
188 instances: make(map[string]*instance),
189 overrides: overridesService,
190 completeBlockLifecycle: completeBlockLifecycle,
191 completeQueues: flushqueues.New[*completeOp](metricCompleteQueueLength),
192 startupComplete: make(chan struct{}),
193 cutToWalStop: make(chan struct{}),
194 }
195
196 // Initialize ready state to starting
197 s.readyErr.Store(&ErrStarting)
198 metricReady.Set(0)
199 s.lastRecordTimeNanos.Store(-1)
200
201 var err error
202 if cfg.ConsumeFromKafka {
203 s.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.Ring.InstanceID)
204 if err != nil {
205 return nil, fmt.Errorf("calculating livestore partition ID: %w", err)
206 }
207
208 // TODO: It's probably easier to just use the ID directly
209 // https://raintank-corp.slack.com/archives/C05CAA0ULUF/p1752847274420489
210 s.cfg.IngestConfig.Kafka.ConsumerGroup, err = ingest.LiveStoreConsumerGroupID(cfg.Ring.InstanceID)
211 if err != nil {
212 return nil, fmt.Errorf("calculating livestore consumer group ID: %w", err)
213 }
214 }
215
216 // setup partition ring
217 partitionRingKV := cfg.PartitionRing.KVStore.Mock
218 if partitionRingKV == nil {
219 partitionRingKV, err = kv.NewClient(cfg.PartitionRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(reg, PartitionRingName+"-lifecycler"), logger)
220 if err != nil {
221 return nil, fmt.Errorf("creating KV store for livestore partition ring: %w", err)
222 }
223 }
224

Calls 8

FromVersionForWritesFunction · 0.92
NewDecoderFunction · 0.92
IngesterPartitionIDFunction · 0.92
LiveStoreConsumerGroupIDFunction · 0.92
StoreMethod · 0.65
SetMethod · 0.65
ToLifecyclerConfigMethod · 0.45