| 165 | } |
| 166 | |
| 167 | func New(cfg Config, overridesService overrides.Interface, completeBlockFlusher completeBlockFlusher, logger log.Logger, reg prometheus.Registerer) (*LiveStore, error) { |
| 168 | completeBlockEncoding, encErr := encoding.FromVersionForWrites(cfg.BlockConfig.Version) |
| 169 | if encErr != nil { |
| 170 | return nil, fmt.Errorf("block version validation failed: %w", encErr) |
| 171 | } |
| 172 | |
| 173 | completeBlockLifecycle, lifecycleErr := newCompleteBlockLifecycle(cfg, completeBlockFlusher, logger) |
| 174 | if lifecycleErr != nil { |
| 175 | return nil, fmt.Errorf("create complete block lifecycle: %w", lifecycleErr) |
| 176 | } |
| 177 | |
| 178 | ctx, cancel := context.WithCancel(context.Background()) |
| 179 | |
| 180 | s := &LiveStore{ |
| 181 | cfg: cfg, |
| 182 | logger: logger, |
| 183 | reg: reg, |
| 184 | decoder: ingest.NewDecoder(), |
| 185 | completeBlockEncoding: completeBlockEncoding, |
| 186 | ctx: ctx, |
| 187 | cancel: cancel, |
| 188 | instances: make(map[string]*instance), |
| 189 | overrides: overridesService, |
| 190 | completeBlockLifecycle: completeBlockLifecycle, |
| 191 | completeQueues: flushqueues.New[*completeOp](metricCompleteQueueLength), |
| 192 | startupComplete: make(chan struct{}), |
| 193 | cutToWalStop: make(chan struct{}), |
| 194 | } |
| 195 | |
| 196 | // Initialize ready state to starting |
| 197 | s.readyErr.Store(&ErrStarting) |
| 198 | metricReady.Set(0) |
| 199 | s.lastRecordTimeNanos.Store(-1) |
| 200 | |
| 201 | var err error |
| 202 | if cfg.ConsumeFromKafka { |
| 203 | s.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.Ring.InstanceID) |
| 204 | if err != nil { |
| 205 | return nil, fmt.Errorf("calculating livestore partition ID: %w", err) |
| 206 | } |
| 207 | |
| 208 | // TODO: It's probably easier to just use the ID directly |
| 209 | // https://raintank-corp.slack.com/archives/C05CAA0ULUF/p1752847274420489 |
| 210 | s.cfg.IngestConfig.Kafka.ConsumerGroup, err = ingest.LiveStoreConsumerGroupID(cfg.Ring.InstanceID) |
| 211 | if err != nil { |
| 212 | return nil, fmt.Errorf("calculating livestore consumer group ID: %w", err) |
| 213 | } |
| 214 | } |
| 215 | |
| 216 | // setup partition ring |
| 217 | partitionRingKV := cfg.PartitionRing.KVStore.Mock |
| 218 | if partitionRingKV == nil { |
| 219 | partitionRingKV, err = kv.NewClient(cfg.PartitionRing.KVStore, ring.GetPartitionRingCodec(), kv.RegistererWithKVName(reg, PartitionRingName+"-lifecycler"), logger) |
| 220 | if err != nil { |
| 221 | return nil, fmt.Errorf("creating KV store for livestore partition ring: %w", err) |
| 222 | } |
| 223 | } |
| 224 | |