MCPcopy
hub / github.com/celery/celery / test_updates_qos_when_changed

Method test_updates_qos_when_changed

t/unit/worker/test_loops.py:603–616  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

601 assert x.connection.drain_events.call_count == 2
602
603 def test_updates_qos_when_changed(self):
604 x = X(self.app)
605 x.qos.prev = 2
606 x.qos.value = 2
607 x.timeout_then_error(x.connection.drain_events)
608 with pytest.raises(socket.error):
609 synloop(*x.args)
610 x.qos.update.assert_not_called()
611
612 x.qos.value = 4
613 x.timeout_then_error(x.connection.drain_events)
614 with pytest.raises(socket.error):
615 synloop(*x.args)
616 x.qos.update.assert_called_with()
617
618 def test_ignores_socket_errors_when_closed(self):
619 x = X(self.app)

Callers

nothing calls this directly

Calls 4

timeout_then_errorMethod · 0.95
synloopFunction · 0.90
XClass · 0.70
raisesMethod · 0.45

Tested by

no test coverage detected