MCPcopy
hub / github.com/celery/celery / TaskMessage

Function TaskMessage

celery/contrib/testing/mocks.py:11–42  ·  view source on GitHub ↗

Create task message in protocol 2 format.

(
    name,  # type: str
    id=None,  # type: str
    args=(),  # type: Sequence
    kwargs=None,  # type: Mapping
    callbacks=None,  # type: Sequence[Signature]
    errbacks=None,  # type: Sequence[Signature]
    chain=None,  # type: Sequence[Signature]
    shadow=None,  # type: str
    utc=None,  # type: bool
    **options  # type: Any
)

Source from the content-addressed store, hash-verified

9
10
11def TaskMessage(
12 name, # type: str
13 id=None, # type: str
14 args=(), # type: Sequence
15 kwargs=None, # type: Mapping
16 callbacks=None, # type: Sequence[Signature]
17 errbacks=None, # type: Sequence[Signature]
18 chain=None, # type: Sequence[Signature]
19 shadow=None, # type: str
20 utc=None, # type: bool
21 **options # type: Any
22):
23 # type: (...) -> Any
24 """Create task message in protocol 2 format."""
25 kwargs = {} if not kwargs else kwargs
26 from kombu.serialization import dumps
27
28 from celery import uuid
29 id = id or uuid()
30 message = Mock(name=f'TaskMessage-{id}')
31 message.headers = {
32 'id': id,
33 'task': name,
34 'shadow': shadow,
35 }
36 embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
37 message.headers.update(options)
38 message.content_type, message.content_encoding, message.body = dumps(
39 (args, kwargs, embed), serializer='json',
40 )
41 message.payload = (args, kwargs, embed)
42 return message
43
44
45def TaskMessage1(

Callers 1

task_message_from_sigFunction · 0.85

Calls 1

updateMethod · 0.45

Tested by

no test coverage detected