Create task message from :class:`celery.Signature`. Example: >>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')
(app, sig, utc=True, TaskMessage=TaskMessage)
| 77 | |
| 78 | |
| 79 | def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage): |
| 80 | # type: (Celery, Signature, bool, Any) -> Any |
| 81 | """Create task message from :class:`celery.Signature`. |
| 82 | |
| 83 | Example: |
| 84 | >>> m = task_message_from_sig(app, add.s(2, 2)) |
| 85 | >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey') |
| 86 | """ |
| 87 | sig.freeze() |
| 88 | callbacks = sig.options.pop('link', None) |
| 89 | errbacks = sig.options.pop('link_error', None) |
| 90 | countdown = sig.options.pop('countdown', None) |
| 91 | if countdown: |
| 92 | eta = app.now() + timedelta(seconds=countdown) |
| 93 | else: |
| 94 | eta = sig.options.pop('eta', None) |
| 95 | if eta and isinstance(eta, datetime): |
| 96 | eta = eta.isoformat() |
| 97 | expires = sig.options.pop('expires', None) |
| 98 | if expires and isinstance(expires, numbers.Real): |
| 99 | expires = app.now() + timedelta(seconds=expires) |
| 100 | if expires and isinstance(expires, datetime): |
| 101 | expires = expires.isoformat() |
| 102 | return TaskMessage( |
| 103 | sig.task, id=sig.id, args=sig.args, |
| 104 | kwargs=sig.kwargs, |
| 105 | callbacks=[dict(s) for s in callbacks] if callbacks else None, |
| 106 | errbacks=[dict(s) for s in errbacks] if errbacks else None, |
| 107 | eta=eta, |
| 108 | expires=expires, |
| 109 | utc=utc, |
| 110 | **sig.options |
| 111 | ) |
| 112 | |
| 113 | |
| 114 | class _ContextMock(Mock): |
nothing calls this directly
no test coverage detected