MCPcopy
hub / github.com/celery/celery / test_reset_pidbox_node

Method test_reset_pidbox_node

t/unit/worker/test_worker.py:559–567  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

557 self.buffer.get_nowait()
558
559 def test_reset_pidbox_node(self):
560 c = self.NoopConsumer()
561 con = find_step(c, consumer.Control).box
562 con.node = Mock()
563 chan = con.node.channel = Mock()
564 chan.close.side_effect = socket.error('foo')
565 c.connection_errors = (socket.error,)
566 con.reset()
567 chan.close.assert_called_with()
568
569 def test_reset_pidbox_node_green(self):
570 c = self.NoopConsumer(pool=Mock(is_green=True))

Callers

nothing calls this directly

Calls 4

NoopConsumerMethod · 0.95
find_stepFunction · 0.85
errorMethod · 0.45
resetMethod · 0.45

Tested by

no test coverage detected