MCPcopy
hub / github.com/celery/celery / test_schedule_bucket_request

Method test_schedule_bucket_request

t/unit/worker/test_consumer.py:202–238  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

200 c.hub._ready.add.assert_called_with(c._flush_events)
201
202 def test_schedule_bucket_request(self):
203 c = self.get_consumer()
204 c.timer = Mock()
205
206 bucket = Mock()
207 request = Mock()
208 bucket.pop = lambda: bucket.contents.popleft()
209 bucket.can_consume.return_value = True
210 bucket.contents = deque()
211
212 with patch(
213 'celery.worker.consumer.consumer.Consumer._limit_move_to_pool'
214 ) as task_reserved:
215 bucket.contents.append((request, 3))
216 c._schedule_bucket_request(bucket)
217 bucket.can_consume.assert_called_with(3)
218 task_reserved.assert_called_with(request)
219
220 bucket.can_consume.return_value = False
221 bucket.contents = deque()
222 bucket.expected_time.return_value = 3.33
223 bucket.contents.append((request, 4))
224 limit_order = c._limit_order
225 c._schedule_bucket_request(bucket)
226 assert c._limit_order == limit_order + 1
227 bucket.can_consume.assert_called_with(4)
228 c.timer.call_after.assert_called_with(
229 3.33, c._schedule_bucket_request, (bucket,),
230 priority=c._limit_order,
231 )
232 bucket.expected_time.assert_called_with(4)
233 assert bucket.pop() == (request, 4)
234
235 bucket.contents = deque()
236 bucket.can_consume.reset_mock()
237 c._schedule_bucket_request(bucket)
238 bucket.can_consume.assert_not_called()
239
240 def test_limit_task(self):
241 c = self.get_consumer()

Callers

nothing calls this directly

Calls 3

get_consumerMethod · 0.45
popMethod · 0.45

Tested by

no test coverage detected