MCPcopy
hub / github.com/celery/celery / test_panel_reply

Method test_panel_reply

t/unit/worker/test_control.py:676–696  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

674 assert excinfo.value.code == 0
675
676 def test_panel_reply(self):
677
678 replies = []
679
680 class _Node(pidbox.Node):
681
682 def reply(self, data, exchange, routing_key, **kwargs):
683 replies.append(data)
684
685 panel = _Node(
686 hostname=hostname,
687 state=self.create_state(consumer=Consumer(self.app)),
688 handlers=control.Panel.data,
689 mailbox=self.app.control.mailbox,
690 )
691 r = panel.dispatch('ping', reply_to={
692 'exchange': 'x',
693 'routing_key': 'x',
694 })
695 assert r == {'ok': 'pong'}
696 assert replies[0] == {panel.hostname: {'ok': 'pong'}}
697
698 def test_pool_restart(self):
699 consumer = Consumer(self.app)

Callers

nothing calls this directly

Calls 3

create_stateMethod · 0.95
_NodeClass · 0.85
ConsumerClass · 0.70

Tested by

no test coverage detected