MCPcopy
hub / github.com/grafana/tempo / Next

Method Next

modules/backendscheduler/backendscheduler.go:232–353  ·  view source on GitHub ↗

Next implements the BackendSchedulerServer interface. It returns the next queued job for a worker.

(ctx context.Context, req *tempopb.NextJobRequest)

Source from the content-addressed store, hash-verified

230
231// Next implements the BackendSchedulerServer interface. It returns the next queued job for a worker.
232func (s *BackendScheduler) Next(ctx context.Context, req *tempopb.NextJobRequest) (*tempopb.NextJobResponse, error) {
233 ctx, span := tracer.Start(ctx, "Next")
234 defer span.End()
235
236 span.SetAttributes(attribute.String("worker_id", req.WorkerId))
237
238 // Find jobs that already exist for this worker
239 j := s.work.GetJobForWorker(ctx, req.WorkerId)
240 if j != nil {
241 resp := &tempopb.NextJobResponse{
242 JobId: j.ID,
243 Type: j.Type,
244 Detail: j.JobDetail,
245 }
246
247 // The job exists in memory, but may not have been persisted to disk.
248 err := s.work.FlushToLocal(ctx, s.cfg.LocalWorkPath, []string{j.ID})
249 if err != nil {
250 // Fail without returning the job if we can't update the job cache.
251 return &tempopb.NextJobResponse{}, status.Error(codes.Internal, ErrFlushFailed.Error())
252 }
253
254 span.SetAttributes(attribute.String("job_id", j.ID))
255
256 metricJobsRetry.WithLabelValues(j.JobDetail.Tenant, j.GetType().String(), j.GetWorkerID()).Inc()
257
258 level.Info(log.Logger).Log("msg", "assigned previous job to worker", "job_id", j.ID, "worker", req.WorkerId)
259
260 return resp, nil
261 }
262
263 timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.JobTimeout)
264 defer cancel()
265
266 // Loop so that stale jobs (whose preconditions no longer hold) can be
267 // silently discarded and we immediately try the next one, rather than
268 // handing an invalid job to a worker.
269 for {
270 select {
271 case j := <-s.mergedJobs:
272 if j == nil {
273 // Channel closed, no jobs available
274 metricJobsNotFound.WithLabelValues(req.WorkerId).Inc()
275 return &tempopb.NextJobResponse{}, status.Error(codes.Internal, ErrNilJob.Error())
276 }
277
278 span.AddEvent("job received", trace.WithAttributes(
279 attribute.String("job_id", j.GetID()),
280 ))
281
282 // All current job types require a tenant. Legacy global retention jobs
283 // emitted by old scheduler binaries have an empty tenant and bypass
284 // the per-type precondition checks.
285 if j.Tenant() == "" {
286 level.Debug(log.Logger).Log("msg", "legacy global job without tenant, passing through",
287 "job_id", j.ID, "type", j.GetType().String())
288 } else {
289 drop := false

Callers

nothing calls this directly

Calls 15

GetWorkerIDMethod · 0.80
GetIDMethod · 0.80
TenantMethod · 0.80
SetWorkerIDMethod · 0.80
IntMethod · 0.80
StartMethod · 0.65
GetJobForWorkerMethod · 0.65
FlushToLocalMethod · 0.65
ErrorMethod · 0.65
IncMethod · 0.65
LogMethod · 0.65
HasJobsForTenantMethod · 0.65

Tested by

no test coverage detected