MCPcopy
hub / github.com/celery/celery / _task_from_fun

Method _task_from_fun

celery/app/base.py:586–629  ·  view source on GitHub ↗
(
        self,
        fun,
        name=None,
        base=None,
        bind=False,
        pydantic: bool = False,
        pydantic_strict: bool = False,
        pydantic_context: typing.Optional[typing.Dict[str, typing.Any]] = None,
        pydantic_dump_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None,
        **options,
    )

Source from the content-addressed store, hash-verified

584 return staticmethod(head_from_fun(fun, bound=bound))
585
586 def _task_from_fun(
587 self,
588 fun,
589 name=None,
590 base=None,
591 bind=False,
592 pydantic: bool = False,
593 pydantic_strict: bool = False,
594 pydantic_context: typing.Optional[typing.Dict[str, typing.Any]] = None,
595 pydantic_dump_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None,
596 **options,
597 ):
598 if not self.finalized and not self.autofinalize:
599 raise RuntimeError('Contract breach: app not finalized')
600 name = name or self.gen_task_name(fun.__name__, fun.__module__)
601 base = base or self.Task
602
603 if name not in self._tasks:
604 if pydantic is True:
605 fun = pydantic_wrapper(self, fun, name, pydantic_strict, pydantic_context, pydantic_dump_kwargs)
606
607 run = fun if bind else staticmethod(fun)
608 task = type(fun.__name__, (base,), dict({
609 'app': self,
610 'name': name,
611 'run': run,
612 '_decorated': True,
613 '__doc__': fun.__doc__,
614 '__module__': fun.__module__,
615 '__annotations__': _get_annotations(fun),
616 '__header__': self.type_checker(fun, bound=bind),
617 '__wrapped__': run}, **options))()
618 # for some reason __qualname__ cannot be set in type()
619 # so we have to set it here.
620 try:
621 task.__qualname__ = fun.__qualname__
622 except AttributeError:
623 pass
624 self._tasks[task.name] = task
625 task.bind(self) # connects task to this app
626 add_autoretry_behaviour(task, **options)
627 else:
628 task = self._tasks[name]
629 return task
630
631 def register_task(self, task, **options):
632 """Utility for registering a task-based class.

Callers 3

_create_task_clsMethod · 0.95
consMethod · 0.80
__innerFunction · 0.80

Calls 6

gen_task_nameMethod · 0.95
type_checkerMethod · 0.95
pydantic_wrapperFunction · 0.85
_get_annotationsFunction · 0.85
add_autoretry_behaviourFunction · 0.85
bindMethod · 0.80

Tested by

no test coverage detected