MCPcopy
hub / github.com/celery/celery / X

Class X

t/unit/worker/test_loops.py:34–124  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

32
33
34class X:
35
36 def __init__(self, app, heartbeat=None, on_task_message=None,
37 transport_driver_type=None):
38 hub = Hub()
39 (
40 self.obj,
41 self.connection,
42 self.consumer,
43 self.blueprint,
44 self.hub,
45 self.qos,
46 self.heartbeat,
47 self.clock,
48 ) = self.args = [Mock(name='obj'),
49 Mock(name='connection'),
50 Mock(name='consumer'),
51 Mock(name='blueprint'),
52 hub,
53 Mock(name='qos'),
54 heartbeat,
55 Mock(name='clock')]
56 self.connection.supports_heartbeats = True
57 self.connection.get_heartbeat_interval.side_effect = (
58 lambda: self.heartbeat
59 )
60 self.consumer.callbacks = []
61 self.obj.strategies = {}
62 self.connection.connection_errors = (socket.error,)
63 if transport_driver_type:
64 self.connection.transport.driver_type = transport_driver_type
65 self.hub.readers = {}
66 self.hub.timer = Mock(name='hub.timer')
67 self.hub.timer._queue = [Mock()]
68 self.hub.fire_timers = Mock(name='hub.fire_timers')
69 self.hub.fire_timers.return_value = 1.7
70 self.hub.poller = Mock(name='hub.poller')
71 self.hub.close = Mock(name='hub.close()') # asynloop calls hub.close
72 self.Hub = self.hub
73 self.blueprint.state = RUN
74 # need this for create_task_handler
75 self._consumer = _consumer = Consumer(
76 Mock(), timer=Mock(), controller=Mock(), app=app)
77 _consumer.on_task_message = on_task_message or []
78 self.obj.create_task_handler = _consumer.create_task_handler
79 self.on_unknown_message = self.obj.on_unknown_message = Mock(
80 name='on_unknown_message',
81 )
82 _consumer.on_unknown_message = self.on_unknown_message
83 self.on_unknown_task = self.obj.on_unknown_task = Mock(
84 name='on_unknown_task',
85 )
86 _consumer.on_unknown_task = self.on_unknown_task
87 self.on_invalid_task = self.obj.on_invalid_task = Mock(
88 name='on_invalid_task',
89 )
90 _consumer.on_invalid_task = self.on_invalid_task
91 self.on_decode_error = self.obj.on_decode_error = Mock(

Calls

no outgoing calls