(self)
| 557 | self.buffer.get_nowait() |
| 558 | |
| 559 | def test_reset_pidbox_node(self): |
| 560 | c = self.NoopConsumer() |
| 561 | con = find_step(c, consumer.Control).box |
| 562 | con.node = Mock() |
| 563 | chan = con.node.channel = Mock() |
| 564 | chan.close.side_effect = socket.error('foo') |
| 565 | c.connection_errors = (socket.error,) |
| 566 | con.reset() |
| 567 | chan.close.assert_called_with() |
| 568 | |
| 569 | def test_reset_pidbox_node_green(self): |
| 570 | c = self.NoopConsumer(pool=Mock(is_green=True)) |
nothing calls this directly
no test coverage detected