(self)
| 262 | return c.task_consumer.on_message |
| 263 | |
| 264 | def test_receieve_message(self): |
| 265 | c = self.LoopConsumer() |
| 266 | c.blueprint.state = RUN |
| 267 | m = self.create_task_message( |
| 268 | Mock(), self.foo_task.name, |
| 269 | args=[2, 4, 8], kwargs={}, |
| 270 | ) |
| 271 | c.update_strategies() |
| 272 | callback = self._get_on_message(c) |
| 273 | callback(m) |
| 274 | |
| 275 | in_bucket = self.buffer.get_nowait() |
| 276 | assert isinstance(in_bucket, Request) |
| 277 | assert in_bucket.name == self.foo_task.name |
| 278 | assert in_bucket.execute() == 2 * 4 * 8 |
| 279 | assert self.timer.empty() |
| 280 | |
| 281 | def test_start_channel_error(self): |
| 282 | def loop_side_effect(): |
nothing calls this directly
no test coverage detected