(cfg Config, flusher completeBlockFlusher, logger log.Logger)
| 73 | } |
| 74 | |
| 75 | func newCompleteBlockLifecycle(cfg Config, flusher completeBlockFlusher, logger log.Logger) (completeBlockLifecycle, error) { |
| 76 | if cfg.ConsumeFromKafka { |
| 77 | return kafkaCompleteBlockLifecycle{}, nil |
| 78 | } |
| 79 | |
| 80 | if flusher == nil { |
| 81 | return nil, fmt.Errorf("complete block flusher is required when kafka consumption is disabled") |
| 82 | } |
| 83 | |
| 84 | return &localCompleteBlockLifecycle{ |
| 85 | flusher: flusher, |
| 86 | logger: logger, |
| 87 | flushConcurrency: cfg.CompleteBlockConcurrency, |
| 88 | retryDelay: cfg.initialBackoff, |
| 89 | completeBlockQueue: flushqueues.New[*localCompleteBlockOp](nil), |
| 90 | }, nil |
| 91 | } |
| 92 | |
| 93 | // The lifecycle for Kafka mode is noop |
| 94 |
no outgoing calls