(self)
| 245 | assert not C.event_sent() |
| 246 | |
| 247 | def test_eta_task(self): |
| 248 | with self._context(self.add.s(2, 2).set(countdown=10)) as C: |
| 249 | C() |
| 250 | assert C.was_scheduled() |
| 251 | C.consumer.qos.increment_eventually.assert_called_with() |
| 252 | |
| 253 | def test_eta_task_utc_disabled(self): |
| 254 | with self._context(self.add.s(2, 2).set(countdown=10), utc=False) as C: |
nothing calls this directly
no test coverage detected