MCPcopy
hub / github.com/celery/celery / test_run

Method test_run

t/unit/worker/test_autoscale.py:121–135  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

119 assert x.pool.num_processes == 3
120
121 def test_run(self):
122
123 class Scaler(autoscale.Autoscaler):
124 scale_called = False
125
126 def body(self):
127 self.scale_called = True
128 getattr(self, "_bgThread__is_shutdown").set()
129
130 worker = Mock(name='worker')
131 x = Scaler(self.pool, 10, 3, worker=worker)
132 x.run()
133 assert getattr(x, "_bgThread__is_shutdown").is_set()
134 assert getattr(x, "_bgThread__is_stopped").is_set()
135 assert x.scale_called
136
137 def test_shrink_raises_exception(self):
138 worker = Mock(name='worker')

Callers

nothing calls this directly

Calls 3

ScalerClass · 0.85
is_setMethod · 0.80
runMethod · 0.45

Tested by

no test coverage detected