EnableCompaction activates the compaction/retention loops
(ctx context.Context, cfg *CompactorConfig, c CompactorSharder, overrides CompactorOverrides)
| 583 | |
| 584 | // EnableCompaction activates the compaction/retention loops |
| 585 | func (rw *readerWriter) EnableCompaction(ctx context.Context, cfg *CompactorConfig, c CompactorSharder, overrides CompactorOverrides) error { |
| 586 | // If compactor configuration is not as expected, no need to go any further |
| 587 | err := cfg.validate() |
| 588 | if err != nil { |
| 589 | return err |
| 590 | } |
| 591 | |
| 592 | // Set default if needed. This is mainly for tests. |
| 593 | if cfg.RetentionConcurrency == 0 { |
| 594 | cfg.RetentionConcurrency = DefaultRetentionConcurrency |
| 595 | } |
| 596 | |
| 597 | rw.compactorCfg = cfg |
| 598 | rw.compactorSharder = c |
| 599 | rw.compactorOverrides = overrides |
| 600 | |
| 601 | if rw.cfg.BlocklistPoll == 0 { |
| 602 | level.Info(rw.logger).Log("msg", "polling cycle unset. compaction and retention disabled") |
| 603 | return nil |
| 604 | } |
| 605 | |
| 606 | if cfg != nil { |
| 607 | level.Info(rw.logger).Log("msg", "compaction and retention enabled.") |
| 608 | go rw.compactionLoop(ctx) |
| 609 | go rw.retentionLoop(ctx) |
| 610 | } |
| 611 | |
| 612 | return nil |
| 613 | } |
| 614 | |
| 615 | func (rw *readerWriter) MarkBlockCompacted(tenantID string, blockID backend.UUID) error { |
| 616 | return rw.c.MarkBlockCompacted((uuid.UUID)(blockID), tenantID) |
nothing calls this directly
no test coverage detected