MCPcopy
hub / github.com/celery/celery / before_task_publish_handler

Method before_task_publish_handler

t/integration/test_canvas.py:861–886  ·  view source on GitHub ↗

We want to republish t1 to ensure that the chain is executed twice

(sender=None, body=None, exchange=None, routing_key=None, headers=None,
                                        properties=None,
                                        declare=None, retry_policy=None, **kwargs)

Source from the content-addressed store, hash-verified

859 # Republish t1 to cause the chain to be executed twice
860 @before_task_publish.connect
861 def before_task_publish_handler(sender=None, body=None, exchange=None, routing_key=None, headers=None,
862 properties=None,
863 declare=None, retry_policy=None, **kwargs):
864 """ We want to republish t1 to ensure that the chain is executed twice """
865
866 metadata = {
867 'body': body,
868 'exchange': exchange,
869 'routing_key': routing_key,
870 'properties': properties,
871 'headers': headers,
872 }
873
874 with celery_session_app.producer_pool.acquire(block=True) as producer:
875 # Publish t1 to the message broker, just before it's going to be published which causes duplication
876 return producer.publish(
877 metadata['body'],
878 exchange=metadata['exchange'],
879 routing_key=metadata['routing_key'],
880 retry=None,
881 retry_policy=retry_policy,
882 serializer='json',
883 delivery_mode=None,
884 headers=headers,
885 **kwargs
886 )
887
888 # Clean redis key
889 redis_connection = get_redis_connection()

Callers

nothing calls this directly

Calls 2

acquireMethod · 0.80
publishMethod · 0.45

Tested by

no test coverage detected