MCPcopy
hub / github.com/celery/celery / test_start__loop

Method test_start__loop

t/unit/worker/test_worker.py:658–695  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

656 cont.stop(c)
657
658 def test_start__loop(self):
659
660 class _QoS:
661 prev = 3
662 value = 4
663
664 def update(self):
665 self.prev = self.value
666
667 init_callback = Mock(name='init_callback')
668 c = self.NoopConsumer(init_callback=init_callback)
669 c.qos = _QoS()
670 c.connection = Connection(self.app.conf.broker_url)
671 c.connection.get_heartbeat_interval = Mock(return_value=None)
672 c.iterations = 0
673
674 def raises_KeyError(*args, **kwargs):
675 c.iterations += 1
676 if c.qos.prev != c.qos.value:
677 c.qos.update()
678 if c.iterations >= 2:
679 raise KeyError('foo')
680
681 c.loop = raises_KeyError
682 with pytest.raises(KeyError):
683 c.start()
684 assert c.iterations == 2
685 assert c.qos.prev == c.qos.value
686
687 init_callback.reset_mock()
688 c = self.NoopConsumer(task_events=False, init_callback=init_callback)
689 c.qos = _QoS()
690 c.connection = Connection(self.app.conf.broker_url)
691 c.connection.get_heartbeat_interval = Mock(return_value=None)
692 c.loop = Mock(side_effect=socket.error('foo'))
693 with pytest.raises(socket.error):
694 c.start()
695 c.loop.assert_called()
696
697 def test_reset_connection_with_no_node(self):
698 c = self.NoopConsumer()

Callers

nothing calls this directly

Calls 6

NoopConsumerMethod · 0.95
_QoSClass · 0.85
ConnectionClass · 0.70
raisesMethod · 0.45
startMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected