MCPcopy
hub / github.com/celery/celery / proto1_to_proto2

Function proto1_to_proto2

celery/worker/strategy.py:66–96  ·  view source on GitHub ↗

Convert Task message protocol 1 arguments to protocol 2. Returns: Tuple: of ``(body, headers, already_decoded_status, utc)``

(message, body)

Source from the content-addressed store, hash-verified

64
65
66def proto1_to_proto2(message, body):
67 """Convert Task message protocol 1 arguments to protocol 2.
68
69 Returns:
70 Tuple: of ``(body, headers, already_decoded_status, utc)``
71 """
72 try:
73 args, kwargs = body.get('args', ()), body.get('kwargs', {})
74 kwargs.items # pylint: disable=pointless-statement
75 except KeyError:
76 raise InvalidTaskError('Message does not have args/kwargs')
77 except AttributeError:
78 raise InvalidTaskError(
79 'Task keyword arguments must be a mapping',
80 )
81 body.update(
82 argsrepr=saferepr(args),
83 kwargsrepr=saferepr(kwargs),
84 headers=message.headers,
85 )
86 try:
87 body['group'] = body['taskset']
88 except KeyError:
89 pass
90 embed = {
91 'callbacks': body.get('callbacks'),
92 'errbacks': body.get('errbacks'),
93 'chord': body.get('chord'),
94 'chain': None,
95 }
96 return (args, kwargs, embed), body, True, body.get('utc', True)
97
98
99def default(task, app, consumer,

Callers 6

test_messageMethod · 0.90
task_message_handlerFunction · 0.85

Calls 4

InvalidTaskErrorClass · 0.90
safereprFunction · 0.90
getMethod · 0.45
updateMethod · 0.45

Tested by 5

test_messageMethod · 0.72