(ctx context.Context, r tempodb.Reader, w tempodb.Writer, c tempodb.Compactor)
| 102 | } |
| 103 | |
| 104 | func (s *tenantStore) Flush(ctx context.Context, r tempodb.Reader, w tempodb.Writer, c tempodb.Compactor) error { |
| 105 | ctx, span := tracer.Start(ctx, "tenantStore.Flush", trace.WithAttributes(attribute.String("tenant", s.tenantID))) |
| 106 | defer span.End() |
| 107 | |
| 108 | if s.liveTraces.Len() == 0 { |
| 109 | span.AddEvent("no live_traces to flush") |
| 110 | // This can happen if the tenant instance was created but |
| 111 | // no live traces were successfully pushed. i.e. all exceeded max trace size. |
| 112 | return nil |
| 113 | } |
| 114 | |
| 115 | blockID, existingBlocksToBeCompacted, err := s.determineBlockIDs(ctx, r) |
| 116 | if err != nil { |
| 117 | return err |
| 118 | } |
| 119 | |
| 120 | // TODO - Check if the existing block can be reused and exit early |
| 121 | |
| 122 | if len(existingBlocksToBeCompacted) > 0 { |
| 123 | for _, blockID := range existingBlocksToBeCompacted { |
| 124 | level.Warn(s.logger).Log( |
| 125 | "msg", "Marking existing block compacted", |
| 126 | "tenant", s.tenantID, |
| 127 | "blockid", blockID, |
| 128 | ) |
| 129 | span.AddEvent("marking existing block compacted", trace.WithAttributes(attribute.String("block_id", blockID.String()))) |
| 130 | if err := c.MarkBlockCompacted(s.tenantID, blockID); err != nil { |
| 131 | return err |
| 132 | } |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | // Initial meta for creating the block |
| 137 | meta := backend.NewBlockMeta(s.tenantID, (uuid.UUID)(blockID), s.enc.Version()) |
| 138 | meta.DedicatedColumns = s.getDedicatedColumns() |
| 139 | meta.ReplicationFactor = 1 |
| 140 | meta.TotalObjects = int64(s.liveTraces.Len()) |
| 141 | |
| 142 | var ( |
| 143 | st = time.Now() |
| 144 | l = s.wal.LocalBackend() |
| 145 | reader = backend.NewReader(l) |
| 146 | writer = backend.NewWriter(l) |
| 147 | iter = newLiveTracesIter(s.liveTraces) |
| 148 | ) |
| 149 | |
| 150 | level.Info(s.logger).Log( |
| 151 | "msg", "Flushing block", |
| 152 | "tenant", s.tenantID, |
| 153 | "blockid", meta.BlockID, |
| 154 | "meta", meta, |
| 155 | ) |
| 156 | |
| 157 | newMeta, err := s.enc.CreateBlock(ctx, &s.cfg.BlockConfig, meta, iter, reader, writer) |
| 158 | if err != nil { |
| 159 | return err |
| 160 | } |
| 161 |
nothing calls this directly
no test coverage detected