(
self,
fun,
name=None,
base=None,
bind=False,
pydantic: bool = False,
pydantic_strict: bool = False,
pydantic_context: typing.Optional[typing.Dict[str, typing.Any]] = None,
pydantic_dump_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None,
**options,
)
| 584 | return staticmethod(head_from_fun(fun, bound=bound)) |
| 585 | |
| 586 | def _task_from_fun( |
| 587 | self, |
| 588 | fun, |
| 589 | name=None, |
| 590 | base=None, |
| 591 | bind=False, |
| 592 | pydantic: bool = False, |
| 593 | pydantic_strict: bool = False, |
| 594 | pydantic_context: typing.Optional[typing.Dict[str, typing.Any]] = None, |
| 595 | pydantic_dump_kwargs: typing.Optional[typing.Dict[str, typing.Any]] = None, |
| 596 | **options, |
| 597 | ): |
| 598 | if not self.finalized and not self.autofinalize: |
| 599 | raise RuntimeError('Contract breach: app not finalized') |
| 600 | name = name or self.gen_task_name(fun.__name__, fun.__module__) |
| 601 | base = base or self.Task |
| 602 | |
| 603 | if name not in self._tasks: |
| 604 | if pydantic is True: |
| 605 | fun = pydantic_wrapper(self, fun, name, pydantic_strict, pydantic_context, pydantic_dump_kwargs) |
| 606 | |
| 607 | run = fun if bind else staticmethod(fun) |
| 608 | task = type(fun.__name__, (base,), dict({ |
| 609 | 'app': self, |
| 610 | 'name': name, |
| 611 | 'run': run, |
| 612 | '_decorated': True, |
| 613 | '__doc__': fun.__doc__, |
| 614 | '__module__': fun.__module__, |
| 615 | '__annotations__': _get_annotations(fun), |
| 616 | '__header__': self.type_checker(fun, bound=bind), |
| 617 | '__wrapped__': run}, **options))() |
| 618 | # for some reason __qualname__ cannot be set in type() |
| 619 | # so we have to set it here. |
| 620 | try: |
| 621 | task.__qualname__ = fun.__qualname__ |
| 622 | except AttributeError: |
| 623 | pass |
| 624 | self._tasks[task.name] = task |
| 625 | task.bind(self) # connects task to this app |
| 626 | add_autoretry_behaviour(task, **options) |
| 627 | else: |
| 628 | task = self._tasks[name] |
| 629 | return task |
| 630 | |
| 631 | def register_task(self, task, **options): |
| 632 | """Utility for registering a task-based class. |
no test coverage detected