AcquireJobWithCancel queries the database to lock a job.
(stream proto.DRPCProvisionerDaemon_AcquireJobWithCancelStream)
| 358 | |
| 359 | // AcquireJobWithCancel queries the database to lock a job. |
| 360 | func (s *server) AcquireJobWithCancel(stream proto.DRPCProvisionerDaemon_AcquireJobWithCancelStream) (retErr error) { |
| 361 | //nolint:gocritic // Provisionerd has specific authz rules. |
| 362 | streamCtx := dbauthz.AsProvisionerd(stream.Context()) |
| 363 | defer func() { |
| 364 | closeErr := stream.Close() |
| 365 | s.Logger.Debug(streamCtx, "closed stream", slog.Error(closeErr)) |
| 366 | if retErr == nil { |
| 367 | retErr = closeErr |
| 368 | } |
| 369 | }() |
| 370 | acqCtx, acqCancel := context.WithCancel(streamCtx) |
| 371 | defer acqCancel() |
| 372 | recvCh := make(chan error, 1) |
| 373 | go func() { |
| 374 | _, err := stream.Recv() // cancel is the only message |
| 375 | recvCh <- err |
| 376 | }() |
| 377 | jec := make(chan jobAndErr, 1) |
| 378 | go func() { |
| 379 | job, err := s.Acquirer.AcquireJob(acqCtx, s.OrganizationID, s.ID, s.Provisioners, s.Tags) |
| 380 | jec <- jobAndErr{job: job, err: err} |
| 381 | }() |
| 382 | var recvErr error |
| 383 | var je jobAndErr |
| 384 | select { |
| 385 | case recvErr = <-recvCh: |
| 386 | acqCancel() |
| 387 | je = <-jec |
| 388 | case je = <-jec: |
| 389 | } |
| 390 | if database.IsQueryCanceledError(je.err) { |
| 391 | s.Logger.Debug(streamCtx, "successful cancel") |
| 392 | err := stream.Send(&proto.AcquiredJob{}) |
| 393 | if err != nil { |
| 394 | // often this is just because the other side hangs up and doesn't wait for the cancel, so log at INFO |
| 395 | s.Logger.Info(streamCtx, "failed to send empty job", slog.Error(err)) |
| 396 | return err |
| 397 | } |
| 398 | return nil |
| 399 | } |
| 400 | if je.err != nil { |
| 401 | return xerrors.Errorf("acquire job: %w", je.err) |
| 402 | } |
| 403 | logger := s.Logger.With(slog.F("job_id", je.job.ID)) |
| 404 | logger.Debug(streamCtx, "locked job from database") |
| 405 | |
| 406 | if recvErr != nil { |
| 407 | logger.Error(streamCtx, "recv error and failed to cancel acquire job", slog.Error(recvErr)) |
| 408 | // Well, this is awkward. We hit an error receiving from the stream, but didn't cancel before we locked a job |
| 409 | // in the database. We need to mark this job as failed so the end user can retry if they want to. |
| 410 | now := s.timeNow() |
| 411 | err := s.Database.UpdateProvisionerJobWithCompleteByID( |
| 412 | //nolint:gocritic // Provisionerd has specific authz rules. |
| 413 | dbauthz.AsProvisionerd(context.Background()), |
| 414 | database.UpdateProvisionerJobWithCompleteByIDParams{ |
| 415 | ID: je.job.ID, |
| 416 | CompletedAt: sql.NullTime{ |
| 417 | Time: now, |
nothing calls this directly
no test coverage detected