We want to republish t1 to ensure that the chain is executed twice
(sender=None, body=None, exchange=None, routing_key=None, headers=None,
properties=None,
declare=None, retry_policy=None, **kwargs)
| 859 | # Republish t1 to cause the chain to be executed twice |
| 860 | @before_task_publish.connect |
| 861 | def before_task_publish_handler(sender=None, body=None, exchange=None, routing_key=None, headers=None, |
| 862 | properties=None, |
| 863 | declare=None, retry_policy=None, **kwargs): |
| 864 | """ We want to republish t1 to ensure that the chain is executed twice """ |
| 865 | |
| 866 | metadata = { |
| 867 | 'body': body, |
| 868 | 'exchange': exchange, |
| 869 | 'routing_key': routing_key, |
| 870 | 'properties': properties, |
| 871 | 'headers': headers, |
| 872 | } |
| 873 | |
| 874 | with celery_session_app.producer_pool.acquire(block=True) as producer: |
| 875 | # Publish t1 to the message broker, just before it's going to be published which causes duplication |
| 876 | return producer.publish( |
| 877 | metadata['body'], |
| 878 | exchange=metadata['exchange'], |
| 879 | routing_key=metadata['routing_key'], |
| 880 | retry=None, |
| 881 | retry_policy=retry_policy, |
| 882 | serializer='json', |
| 883 | delivery_mode=None, |
| 884 | headers=headers, |
| 885 | **kwargs |
| 886 | ) |
| 887 | |
| 888 | # Clean redis key |
| 889 | redis_connection = get_redis_connection() |