MCPcopy
hub / github.com/grafana/tempo / retainTenant

Method retainTenant

tempodb/retention.go:68–130  ·  view source on GitHub ↗
(ctx context.Context, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides)

Source from the content-addressed store, hash-verified

66}
67
68func (rw *readerWriter) retainTenant(ctx context.Context, tenantID string, compactorCfg *CompactorConfig, compactorSharder CompactorSharder, compactorOverrides CompactorOverrides) {
69 start := time.Now()
70 defer func() { metricRetentionDuration.Observe(time.Since(start).Seconds()) }()
71
72 // Check for overrides
73 retention := compactorCfg.BlockRetention // Default
74 if r := compactorOverrides.BlockRetentionForTenant(tenantID); r != 0 {
75 retention = r
76 }
77 level.Debug(rw.logger).Log("msg", "Performing block retention", "tenantID", tenantID, "retention", retention)
78
79 // iterate through block list. make compacted anything that is past retention.
80 cutoff := time.Now().Add(-retention)
81 blocklist := rw.blocklist.Metas(tenantID)
82 for _, b := range blocklist {
83 select {
84 case <-ctx.Done():
85 return
86 default:
87 if b.EndTime.Before(cutoff) && compactorSharder.Owns(b.BlockID.String()) {
88 level.Info(rw.logger).Log("msg", "marking block for deletion", "blockID", b.BlockID, "tenantID", tenantID)
89 err := rw.c.MarkBlockCompacted((uuid.UUID)(b.BlockID), tenantID)
90 if err != nil {
91 level.Error(rw.logger).Log("msg", "failed to mark block compacted during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err)
92 metricRetentionErrors.Inc()
93 } else {
94 metricMarkedForDeletion.Inc()
95
96 rw.blocklist.Update(tenantID, nil, []*backend.BlockMeta{b}, []*backend.CompactedBlockMeta{
97 {
98 BlockMeta: *b,
99 CompactedTime: time.Now(),
100 },
101 }, nil)
102 }
103 }
104 }
105 }
106
107 // iterate through compacted list looking for blocks ready to be cleared
108 cutoff = time.Now().Add(-compactorCfg.CompactedBlockRetention)
109 compactedBlocklist := rw.blocklist.CompactedMetas(tenantID)
110 for _, b := range compactedBlocklist {
111 select {
112 case <-ctx.Done():
113 return
114 default:
115 level.Debug(rw.logger).Log("owns", compactorSharder.Owns(b.BlockID.String()), "blockID", b.BlockID, "tenantID", tenantID)
116 if b.CompactedTime.Before(cutoff) && compactorSharder.Owns(b.BlockID.String()) {
117 level.Info(rw.logger).Log("msg", "deleting block", "blockID", b.BlockID, "tenantID", tenantID)
118 err := rw.c.ClearBlock((uuid.UUID)(b.BlockID), tenantID)
119 if err != nil {
120 level.Error(rw.logger).Log("msg", "failed to clear compacted block during retention", "blockID", b.BlockID, "tenantID", tenantID, "err", err)
121 metricRetentionErrors.Inc()
122 } else {
123 metricDeleted.Inc()
124
125 rw.blocklist.Update(tenantID, nil, nil, nil, []*backend.CompactedBlockMeta{b})

Callers 2

RetainWithConfigMethod · 0.95

Calls 15

NowMethod · 0.65
ObserveMethod · 0.65
LogMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
OwnsMethod · 0.65
MarkBlockCompactedMethod · 0.65
ErrorMethod · 0.65
IncMethod · 0.65
ClearBlockMethod · 0.65
MetasMethod · 0.45

Tested by

no test coverage detected