MCPcopy
hub / github.com/celery/celery / task_message_from_sig

Function task_message_from_sig

celery/contrib/testing/mocks.py:79–111  ·  view source on GitHub ↗

Create task message from :class:`celery.Signature`. Example: >>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')

(app, sig, utc=True, TaskMessage=TaskMessage)

Source from the content-addressed store, hash-verified

77
78
79def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage):
80 # type: (Celery, Signature, bool, Any) -> Any
81 """Create task message from :class:`celery.Signature`.
82
83 Example:
84 >>> m = task_message_from_sig(app, add.s(2, 2))
85 >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')
86 """
87 sig.freeze()
88 callbacks = sig.options.pop('link', None)
89 errbacks = sig.options.pop('link_error', None)
90 countdown = sig.options.pop('countdown', None)
91 if countdown:
92 eta = app.now() + timedelta(seconds=countdown)
93 else:
94 eta = sig.options.pop('eta', None)
95 if eta and isinstance(eta, datetime):
96 eta = eta.isoformat()
97 expires = sig.options.pop('expires', None)
98 if expires and isinstance(expires, numbers.Real):
99 expires = app.now() + timedelta(seconds=expires)
100 if expires and isinstance(expires, datetime):
101 expires = expires.isoformat()
102 return TaskMessage(
103 sig.task, id=sig.id, args=sig.args,
104 kwargs=sig.kwargs,
105 callbacks=[dict(s) for s in callbacks] if callbacks else None,
106 errbacks=[dict(s) for s in errbacks] if errbacks else None,
107 eta=eta,
108 expires=expires,
109 utc=utc,
110 **sig.options
111 )
112
113
114class _ContextMock(Mock):

Callers

nothing calls this directly

Calls 4

TaskMessageFunction · 0.85
freezeMethod · 0.45
popMethod · 0.45
nowMethod · 0.45

Tested by

no test coverage detected