Next implements the BackendSchedulerServer interface. It returns the next queued job for a worker.
(ctx context.Context, req *tempopb.NextJobRequest)
| 230 | |
| 231 | // Next implements the BackendSchedulerServer interface. It returns the next queued job for a worker. |
| 232 | func (s *BackendScheduler) Next(ctx context.Context, req *tempopb.NextJobRequest) (*tempopb.NextJobResponse, error) { |
| 233 | ctx, span := tracer.Start(ctx, "Next") |
| 234 | defer span.End() |
| 235 | |
| 236 | span.SetAttributes(attribute.String("worker_id", req.WorkerId)) |
| 237 | |
| 238 | // Find jobs that already exist for this worker |
| 239 | j := s.work.GetJobForWorker(ctx, req.WorkerId) |
| 240 | if j != nil { |
| 241 | resp := &tempopb.NextJobResponse{ |
| 242 | JobId: j.ID, |
| 243 | Type: j.Type, |
| 244 | Detail: j.JobDetail, |
| 245 | } |
| 246 | |
| 247 | // The job exists in memory, but may not have been persisted to disk. |
| 248 | err := s.work.FlushToLocal(ctx, s.cfg.LocalWorkPath, []string{j.ID}) |
| 249 | if err != nil { |
| 250 | // Fail without returning the job if we can't update the job cache. |
| 251 | return &tempopb.NextJobResponse{}, status.Error(codes.Internal, ErrFlushFailed.Error()) |
| 252 | } |
| 253 | |
| 254 | span.SetAttributes(attribute.String("job_id", j.ID)) |
| 255 | |
| 256 | metricJobsRetry.WithLabelValues(j.JobDetail.Tenant, j.GetType().String(), j.GetWorkerID()).Inc() |
| 257 | |
| 258 | level.Info(log.Logger).Log("msg", "assigned previous job to worker", "job_id", j.ID, "worker", req.WorkerId) |
| 259 | |
| 260 | return resp, nil |
| 261 | } |
| 262 | |
| 263 | timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.JobTimeout) |
| 264 | defer cancel() |
| 265 | |
| 266 | // Loop so that stale jobs (whose preconditions no longer hold) can be |
| 267 | // silently discarded and we immediately try the next one, rather than |
| 268 | // handing an invalid job to a worker. |
| 269 | for { |
| 270 | select { |
| 271 | case j := <-s.mergedJobs: |
| 272 | if j == nil { |
| 273 | // Channel closed, no jobs available |
| 274 | metricJobsNotFound.WithLabelValues(req.WorkerId).Inc() |
| 275 | return &tempopb.NextJobResponse{}, status.Error(codes.Internal, ErrNilJob.Error()) |
| 276 | } |
| 277 | |
| 278 | span.AddEvent("job received", trace.WithAttributes( |
| 279 | attribute.String("job_id", j.GetID()), |
| 280 | )) |
| 281 | |
| 282 | // All current job types require a tenant. Legacy global retention jobs |
| 283 | // emitted by old scheduler binaries have an empty tenant and bypass |
| 284 | // the per-type precondition checks. |
| 285 | if j.Tenant() == "" { |
| 286 | level.Debug(log.Logger).Log("msg", "legacy global job without tenant, passing through", |
| 287 | "job_id", j.ID, "type", j.GetType().String()) |
| 288 | } else { |
| 289 | drop := false |
nothing calls this directly
no test coverage detected