Ensure obj is a signature, or None. Arguments: d (Optional[Union[abstract.CallableSignature, Mapping]]): Signature or dict-serialized signature. app (celery.Celery): App to bind signature to. clone (bool): If d' is already a signature,
(d, app=None, clone=False)
| 2391 | |
| 2392 | |
| 2393 | def maybe_signature(d, app=None, clone=False): |
| 2394 | """Ensure obj is a signature, or None. |
| 2395 | |
| 2396 | Arguments: |
| 2397 | d (Optional[Union[abstract.CallableSignature, Mapping]]): |
| 2398 | Signature or dict-serialized signature. |
| 2399 | app (celery.Celery): |
| 2400 | App to bind signature to. |
| 2401 | clone (bool): |
| 2402 | If d' is already a signature, the signature |
| 2403 | will be cloned when this flag is enabled. |
| 2404 | |
| 2405 | Returns: |
| 2406 | Optional[abstract.CallableSignature] |
| 2407 | """ |
| 2408 | if d is not None: |
| 2409 | if isinstance(d, abstract.CallableSignature): |
| 2410 | if clone: |
| 2411 | d = d.clone() |
| 2412 | elif isinstance(d, dict): |
| 2413 | d = signature(d) |
| 2414 | |
| 2415 | if app is not None: |
| 2416 | d._app = app |
| 2417 | return d |
| 2418 | |
| 2419 | |
| 2420 | maybe_subtask = maybe_signature # XXX compat |