Convert Task message protocol 1 arguments to protocol 2. Returns: Tuple: of ``(body, headers, already_decoded_status, utc)``
(message, body)
| 64 | |
| 65 | |
| 66 | def proto1_to_proto2(message, body): |
| 67 | """Convert Task message protocol 1 arguments to protocol 2. |
| 68 | |
| 69 | Returns: |
| 70 | Tuple: of ``(body, headers, already_decoded_status, utc)`` |
| 71 | """ |
| 72 | try: |
| 73 | args, kwargs = body.get('args', ()), body.get('kwargs', {}) |
| 74 | kwargs.items # pylint: disable=pointless-statement |
| 75 | except KeyError: |
| 76 | raise InvalidTaskError('Message does not have args/kwargs') |
| 77 | except AttributeError: |
| 78 | raise InvalidTaskError( |
| 79 | 'Task keyword arguments must be a mapping', |
| 80 | ) |
| 81 | body.update( |
| 82 | argsrepr=saferepr(args), |
| 83 | kwargsrepr=saferepr(kwargs), |
| 84 | headers=message.headers, |
| 85 | ) |
| 86 | try: |
| 87 | body['group'] = body['taskset'] |
| 88 | except KeyError: |
| 89 | pass |
| 90 | embed = { |
| 91 | 'callbacks': body.get('callbacks'), |
| 92 | 'errbacks': body.get('errbacks'), |
| 93 | 'chord': body.get('chord'), |
| 94 | 'chain': None, |
| 95 | } |
| 96 | return (args, kwargs, embed), body, True, body.get('utc', True) |
| 97 | |
| 98 | |
| 99 | def default(task, app, consumer, |