MCPcopy
hub / github.com/grafana/tempo / starting

Method starting

modules/backendscheduler/backendscheduler.go:124–180  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

122}
123
124func (s *BackendScheduler) starting(ctx context.Context) error {
125 level.Info(log.Logger).Log("msg", "backend scheduler starting")
126
127 if s.cfg.Poll {
128 s.store.EnablePolling(ctx, blocklist.OwnsNothingSharder, true)
129 }
130
131 err := s.loadWorkCache(ctx)
132 if err != nil && !errors.Is(err, backend.ErrDoesNotExist) {
133 return fmt.Errorf("failed to load work cache: %w", err)
134 }
135
136 // Load the batch manifest (best-effort; missing file means no active redaction batches).
137 if err := s.work.LoadBatchesFromLocal(ctx, s.cfg.LocalWorkPath); err != nil {
138 level.Info(log.Logger).Log("msg", "no batch manifest found at startup", "err", err)
139 }
140
141 wg := sync.WaitGroup{}
142
143 for i := range s.providers {
144 s.providers[i].jobs = s.providers[i].provider.Start(ctx)
145
146 wg.Add(1)
147 // Start a goroutine to forward jobs from each provider to the merged channel
148 go func(jobs <-chan *work.Job) {
149 defer wg.Done()
150
151 var job *work.Job
152
153 for {
154 select {
155 case job = <-jobs:
156 case <-ctx.Done():
157 level.Info(log.Logger).Log("msg", "stopping provider", "provider", i)
158 return
159 }
160
161 select {
162 case s.mergedJobs <- job:
163 metricProviderJobsMerged.WithLabelValues(strconv.Itoa(i)).Inc()
164 case <-ctx.Done():
165 level.Info(log.Logger).Log("msg", "stopping provider", "provider", i)
166 return
167 }
168 }
169 }(s.providers[i].jobs)
170 }
171
172 // Start a goroutine to close the merged channel when all providers are done
173 go func() {
174 wg.Wait()
175 level.Info(log.Logger).Log("msg", "all providers stopped")
176 close(s.mergedJobs)
177 }()
178
179 return nil
180}
181

Callers 4

TestShardedIntegrationFunction · 0.45
TestBackendSchedulerFunction · 0.45

Calls 9

loadWorkCacheMethod · 0.95
LogMethod · 0.65
EnablePollingMethod · 0.65
LoadBatchesFromLocalMethod · 0.65
StartMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
IncMethod · 0.65
WaitMethod · 0.65

Tested by 4

TestShardedIntegrationFunction · 0.36
TestBackendSchedulerFunction · 0.36