(rw *readerWriter, tenantID string, oldBlocks, newBlocks []*backend.BlockMeta)
| 364 | } |
| 365 | |
| 366 | func markCompacted(rw *readerWriter, tenantID string, oldBlocks, newBlocks []*backend.BlockMeta) error { |
| 367 | // Check if we have any errors, but continue marking the blocks as compacted |
| 368 | var errCount int |
| 369 | for _, meta := range oldBlocks { |
| 370 | // Mark in the backend |
| 371 | if err := rw.c.MarkBlockCompacted((uuid.UUID)(meta.BlockID), tenantID); err != nil { |
| 372 | errCount++ |
| 373 | level.Error(rw.logger).Log("msg", "unable to mark block compacted", "blockID", meta.BlockID, "tenantID", tenantID, "err", err) |
| 374 | metricCompactionErrors.Inc() |
| 375 | } |
| 376 | } |
| 377 | |
| 378 | // Converted outgoing blocks into compacted entries. |
| 379 | newCompactions := make([]*backend.CompactedBlockMeta, 0, len(oldBlocks)) |
| 380 | for _, newBlock := range oldBlocks { |
| 381 | newCompactions = append(newCompactions, &backend.CompactedBlockMeta{ |
| 382 | BlockMeta: *newBlock, |
| 383 | CompactedTime: time.Now(), |
| 384 | }) |
| 385 | } |
| 386 | |
| 387 | // Update blocklist in memory |
| 388 | rw.blocklist.Update(tenantID, newBlocks, oldBlocks, newCompactions, nil) |
| 389 | |
| 390 | if errCount > 0 { |
| 391 | return fmt.Errorf("unable to mark %d blocks compacted", errCount) |
| 392 | } |
| 393 | |
| 394 | return nil |
| 395 | } |
| 396 | |
| 397 | func MeasureOutstandingBlocks(tenantID string, blockSelector blockselector.CompactionBlockSelector, owned func(hash string) bool) { |
| 398 | // count number of per-tenant outstanding blocks before next maintenance cycle |
no test coverage detected