MCPcopy
hub / github.com/grafana/tempo / Flush

Method Flush

modules/blockbuilder/tenant_store.go:104–201  ·  view source on GitHub ↗
(ctx context.Context, r tempodb.Reader, w tempodb.Writer, c tempodb.Compactor)

Source from the content-addressed store, hash-verified

102}
103
104func (s *tenantStore) Flush(ctx context.Context, r tempodb.Reader, w tempodb.Writer, c tempodb.Compactor) error {
105 ctx, span := tracer.Start(ctx, "tenantStore.Flush", trace.WithAttributes(attribute.String("tenant", s.tenantID)))
106 defer span.End()
107
108 if s.liveTraces.Len() == 0 {
109 span.AddEvent("no live_traces to flush")
110 // This can happen if the tenant instance was created but
111 // no live traces were successfully pushed. i.e. all exceeded max trace size.
112 return nil
113 }
114
115 blockID, existingBlocksToBeCompacted, err := s.determineBlockIDs(ctx, r)
116 if err != nil {
117 return err
118 }
119
120 // TODO - Check if the existing block can be reused and exit early
121
122 if len(existingBlocksToBeCompacted) > 0 {
123 for _, blockID := range existingBlocksToBeCompacted {
124 level.Warn(s.logger).Log(
125 "msg", "Marking existing block compacted",
126 "tenant", s.tenantID,
127 "blockid", blockID,
128 )
129 span.AddEvent("marking existing block compacted", trace.WithAttributes(attribute.String("block_id", blockID.String())))
130 if err := c.MarkBlockCompacted(s.tenantID, blockID); err != nil {
131 return err
132 }
133 }
134 }
135
136 // Initial meta for creating the block
137 meta := backend.NewBlockMeta(s.tenantID, (uuid.UUID)(blockID), s.enc.Version())
138 meta.DedicatedColumns = s.getDedicatedColumns()
139 meta.ReplicationFactor = 1
140 meta.TotalObjects = int64(s.liveTraces.Len())
141
142 var (
143 st = time.Now()
144 l = s.wal.LocalBackend()
145 reader = backend.NewReader(l)
146 writer = backend.NewWriter(l)
147 iter = newLiveTracesIter(s.liveTraces)
148 )
149
150 level.Info(s.logger).Log(
151 "msg", "Flushing block",
152 "tenant", s.tenantID,
153 "blockid", meta.BlockID,
154 "meta", meta,
155 )
156
157 newMeta, err := s.enc.CreateBlock(ctx, &s.cfg.BlockConfig, meta, iter, reader, writer)
158 if err != nil {
159 return err
160 }
161

Callers

nothing calls this directly

Calls 15

determineBlockIDsMethod · 0.95
getDedicatedColumnsMethod · 0.95
NewBlockMetaFunction · 0.92
NewReaderFunction · 0.92
NewWriterFunction · 0.92
newLiveTracesIterFunction · 0.85
NewWriteableBlockFunction · 0.85
LocalBackendMethod · 0.80
DedupedSpansMethod · 0.80
MinMaxTimestampsMethod · 0.80
StartMethod · 0.65

Tested by

no test coverage detected