MCPcopy
hub / github.com/celery/celery / assert_next_task_data_equal

Method assert_next_task_data_equal

t/unit/tasks/test_tasks.py:978–1003  ·  view source on GitHub ↗
(self, consumer, presult, task_name,
                                    test_eta=False, test_expires=False,
                                    properties=None, headers=None, **kwargs)

Source from the content-addressed store, hash-verified

976 assert result.id == task_id
977
978 def assert_next_task_data_equal(self, consumer, presult, task_name,
979 test_eta=False, test_expires=False,
980 properties=None, headers=None, **kwargs):
981 next_task = consumer.queues[0].get(accept=['pickle', 'json'])
982 task_properties = next_task.properties
983 task_headers = next_task.headers
984 task_body = next_task.decode()
985 task_args, task_kwargs, embed = task_body
986 assert task_headers['id'] == presult.id
987 assert task_headers['task'] == task_name
988 if test_eta:
989 assert isinstance(task_headers.get('eta'), str)
990 to_datetime = datetime.fromisoformat(task_headers.get('eta'))
991 assert isinstance(to_datetime, datetime)
992 if test_expires:
993 assert isinstance(task_headers.get('expires'), str)
994 to_datetime = datetime.fromisoformat(task_headers.get('expires'))
995 assert isinstance(to_datetime, datetime)
996 properties = properties or {}
997 for arg_name, arg_value in properties.items():
998 assert task_properties.get(arg_name) == arg_value
999 headers = headers or {}
1000 for arg_name, arg_value in headers.items():
1001 assert task_headers.get(arg_name) == arg_value
1002 for arg_name, arg_value in kwargs.items():
1003 assert task_kwargs.get(arg_name) == arg_value
1004
1005 def test_incomplete_task_cls(self):
1006

Callers 1

test_regular_taskMethod · 0.95

Calls 3

getMethod · 0.45
decodeMethod · 0.45
itemsMethod · 0.45

Tested by

no test coverage detected