(self, consumer, presult, task_name,
test_eta=False, test_expires=False,
properties=None, headers=None, **kwargs)
| 976 | assert result.id == task_id |
| 977 | |
| 978 | def assert_next_task_data_equal(self, consumer, presult, task_name, |
| 979 | test_eta=False, test_expires=False, |
| 980 | properties=None, headers=None, **kwargs): |
| 981 | next_task = consumer.queues[0].get(accept=['pickle', 'json']) |
| 982 | task_properties = next_task.properties |
| 983 | task_headers = next_task.headers |
| 984 | task_body = next_task.decode() |
| 985 | task_args, task_kwargs, embed = task_body |
| 986 | assert task_headers['id'] == presult.id |
| 987 | assert task_headers['task'] == task_name |
| 988 | if test_eta: |
| 989 | assert isinstance(task_headers.get('eta'), str) |
| 990 | to_datetime = datetime.fromisoformat(task_headers.get('eta')) |
| 991 | assert isinstance(to_datetime, datetime) |
| 992 | if test_expires: |
| 993 | assert isinstance(task_headers.get('expires'), str) |
| 994 | to_datetime = datetime.fromisoformat(task_headers.get('expires')) |
| 995 | assert isinstance(to_datetime, datetime) |
| 996 | properties = properties or {} |
| 997 | for arg_name, arg_value in properties.items(): |
| 998 | assert task_properties.get(arg_name) == arg_value |
| 999 | headers = headers or {} |
| 1000 | for arg_name, arg_value in headers.items(): |
| 1001 | assert task_headers.get(arg_name) == arg_value |
| 1002 | for arg_name, arg_value in kwargs.items(): |
| 1003 | assert task_kwargs.get(arg_name) == arg_value |
| 1004 | |
| 1005 | def test_incomplete_task_cls(self): |
| 1006 |
no test coverage detected