MCPcopy Index your code
hub / github.com/coder/coder / AcquireJobWithCancel

Method AcquireJobWithCancel

coderd/provisionerdserver/provisionerdserver.go:360–443  ·  view source on GitHub ↗

AcquireJobWithCancel queries the database to lock a job.

(stream proto.DRPCProvisionerDaemon_AcquireJobWithCancelStream)

Source from the content-addressed store, hash-verified

358
359// AcquireJobWithCancel queries the database to lock a job.
360func (s *server) AcquireJobWithCancel(stream proto.DRPCProvisionerDaemon_AcquireJobWithCancelStream) (retErr error) {
361 //nolint:gocritic // Provisionerd has specific authz rules.
362 streamCtx := dbauthz.AsProvisionerd(stream.Context())
363 defer func() {
364 closeErr := stream.Close()
365 s.Logger.Debug(streamCtx, "closed stream", slog.Error(closeErr))
366 if retErr == nil {
367 retErr = closeErr
368 }
369 }()
370 acqCtx, acqCancel := context.WithCancel(streamCtx)
371 defer acqCancel()
372 recvCh := make(chan error, 1)
373 go func() {
374 _, err := stream.Recv() // cancel is the only message
375 recvCh <- err
376 }()
377 jec := make(chan jobAndErr, 1)
378 go func() {
379 job, err := s.Acquirer.AcquireJob(acqCtx, s.OrganizationID, s.ID, s.Provisioners, s.Tags)
380 jec <- jobAndErr{job: job, err: err}
381 }()
382 var recvErr error
383 var je jobAndErr
384 select {
385 case recvErr = <-recvCh:
386 acqCancel()
387 je = <-jec
388 case je = <-jec:
389 }
390 if database.IsQueryCanceledError(je.err) {
391 s.Logger.Debug(streamCtx, "successful cancel")
392 err := stream.Send(&proto.AcquiredJob{})
393 if err != nil {
394 // often this is just because the other side hangs up and doesn't wait for the cancel, so log at INFO
395 s.Logger.Info(streamCtx, "failed to send empty job", slog.Error(err))
396 return err
397 }
398 return nil
399 }
400 if je.err != nil {
401 return xerrors.Errorf("acquire job: %w", je.err)
402 }
403 logger := s.Logger.With(slog.F("job_id", je.job.ID))
404 logger.Debug(streamCtx, "locked job from database")
405
406 if recvErr != nil {
407 logger.Error(streamCtx, "recv error and failed to cancel acquire job", slog.Error(recvErr))
408 // Well, this is awkward. We hit an error receiving from the stream, but didn't cancel before we locked a job
409 // in the database. We need to mark this job as failed so the end user can retry if they want to.
410 now := s.timeNow()
411 err := s.Database.UpdateProvisionerJobWithCompleteByID(
412 //nolint:gocritic // Provisionerd has specific authz rules.
413 dbauthz.AsProvisionerd(context.Background()),
414 database.UpdateProvisionerJobWithCompleteByIDParams{
415 ID: je.job.ID,
416 CompletedAt: sql.NullTime{
417 Time: now,

Callers

nothing calls this directly

Calls 13

timeNowMethod · 0.95
acquireProtoJobMethod · 0.95
AsProvisionerdFunction · 0.92
IsQueryCanceledErrorFunction · 0.92
ContextMethod · 0.65
CloseMethod · 0.65
RecvMethod · 0.65
AcquireJobMethod · 0.65
SendMethod · 0.65
ErrorMethod · 0.45
InfoMethod · 0.45

Tested by

no test coverage detected