(self)
| 601 | assert x.connection.drain_events.call_count == 2 |
| 602 | |
| 603 | def test_updates_qos_when_changed(self): |
| 604 | x = X(self.app) |
| 605 | x.qos.prev = 2 |
| 606 | x.qos.value = 2 |
| 607 | x.timeout_then_error(x.connection.drain_events) |
| 608 | with pytest.raises(socket.error): |
| 609 | synloop(*x.args) |
| 610 | x.qos.update.assert_not_called() |
| 611 | |
| 612 | x.qos.value = 4 |
| 613 | x.timeout_then_error(x.connection.drain_events) |
| 614 | with pytest.raises(socket.error): |
| 615 | synloop(*x.args) |
| 616 | x.qos.update.assert_called_with() |
| 617 | |
| 618 | def test_ignores_socket_errors_when_closed(self): |
| 619 | x = X(self.app) |
nothing calls this directly
no test coverage detected