| 247 | state.should_stop = None |
| 248 | |
| 249 | def test_updates_qos(self): |
| 250 | x = X(self.app) |
| 251 | x.qos.prev = 3 |
| 252 | x.qos.value = 3 |
| 253 | x.hub.on_tick.add(x.closer(mod=2)) |
| 254 | x.hub.timer._queue = [1] |
| 255 | asynloop(*x.args) |
| 256 | x.qos.update.assert_not_called() |
| 257 | |
| 258 | x = X(self.app) |
| 259 | x.qos.prev = 1 |
| 260 | x.qos.value = 6 |
| 261 | x.hub.on_tick.add(x.closer(mod=2)) |
| 262 | asynloop(*x.args) |
| 263 | x.qos.update.assert_called_with() |
| 264 | x.hub.fire_timers.assert_called_with(propagate=(socket.error,)) |
| 265 | |
| 266 | def test_poll_empty(self): |
| 267 | x = X(self.app) |