(self)
| 674 | assert excinfo.value.code == 0 |
| 675 | |
| 676 | def test_panel_reply(self): |
| 677 | |
| 678 | replies = [] |
| 679 | |
| 680 | class _Node(pidbox.Node): |
| 681 | |
| 682 | def reply(self, data, exchange, routing_key, **kwargs): |
| 683 | replies.append(data) |
| 684 | |
| 685 | panel = _Node( |
| 686 | hostname=hostname, |
| 687 | state=self.create_state(consumer=Consumer(self.app)), |
| 688 | handlers=control.Panel.data, |
| 689 | mailbox=self.app.control.mailbox, |
| 690 | ) |
| 691 | r = panel.dispatch('ping', reply_to={ |
| 692 | 'exchange': 'x', |
| 693 | 'routing_key': 'x', |
| 694 | }) |
| 695 | assert r == {'ok': 'pong'} |
| 696 | assert replies[0] == {panel.hostname: {'ok': 'pong'}} |
| 697 | |
| 698 | def test_pool_restart(self): |
| 699 | consumer = Consumer(self.app) |
nothing calls this directly
no test coverage detected