(self)
| 200 | c.hub._ready.add.assert_called_with(c._flush_events) |
| 201 | |
| 202 | def test_schedule_bucket_request(self): |
| 203 | c = self.get_consumer() |
| 204 | c.timer = Mock() |
| 205 | |
| 206 | bucket = Mock() |
| 207 | request = Mock() |
| 208 | bucket.pop = lambda: bucket.contents.popleft() |
| 209 | bucket.can_consume.return_value = True |
| 210 | bucket.contents = deque() |
| 211 | |
| 212 | with patch( |
| 213 | 'celery.worker.consumer.consumer.Consumer._limit_move_to_pool' |
| 214 | ) as task_reserved: |
| 215 | bucket.contents.append((request, 3)) |
| 216 | c._schedule_bucket_request(bucket) |
| 217 | bucket.can_consume.assert_called_with(3) |
| 218 | task_reserved.assert_called_with(request) |
| 219 | |
| 220 | bucket.can_consume.return_value = False |
| 221 | bucket.contents = deque() |
| 222 | bucket.expected_time.return_value = 3.33 |
| 223 | bucket.contents.append((request, 4)) |
| 224 | limit_order = c._limit_order |
| 225 | c._schedule_bucket_request(bucket) |
| 226 | assert c._limit_order == limit_order + 1 |
| 227 | bucket.can_consume.assert_called_with(4) |
| 228 | c.timer.call_after.assert_called_with( |
| 229 | 3.33, c._schedule_bucket_request, (bucket,), |
| 230 | priority=c._limit_order, |
| 231 | ) |
| 232 | bucket.expected_time.assert_called_with(4) |
| 233 | assert bucket.pop() == (request, 4) |
| 234 | |
| 235 | bucket.contents = deque() |
| 236 | bucket.can_consume.reset_mock() |
| 237 | c._schedule_bucket_request(bucket) |
| 238 | bucket.can_consume.assert_not_called() |
| 239 | |
| 240 | def test_limit_task(self): |
| 241 | c = self.get_consumer() |
nothing calls this directly
no test coverage detected