MCPcopy
hub / github.com/celery/celery / test_updates_qos

Method test_updates_qos

t/unit/worker/test_loops.py:249–264  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

247 state.should_stop = None
248
249 def test_updates_qos(self):
250 x = X(self.app)
251 x.qos.prev = 3
252 x.qos.value = 3
253 x.hub.on_tick.add(x.closer(mod=2))
254 x.hub.timer._queue = [1]
255 asynloop(*x.args)
256 x.qos.update.assert_not_called()
257
258 x = X(self.app)
259 x.qos.prev = 1
260 x.qos.value = 6
261 x.hub.on_tick.add(x.closer(mod=2))
262 asynloop(*x.args)
263 x.qos.update.assert_called_with()
264 x.hub.fire_timers.assert_called_with(propagate=(socket.error,))
265
266 def test_poll_empty(self):
267 x = X(self.app)

Callers

nothing calls this directly

Calls 4

closerMethod · 0.95
asynloopFunction · 0.90
XClass · 0.70
addMethod · 0.45

Tested by

no test coverage detected