(self, manager)
| 281 | |
| 282 | @flaky |
| 283 | def test_second_order_replace(self, manager): |
| 284 | if not manager.app.conf.result_backend.startswith('redis'): |
| 285 | raise pytest.skip('Requires redis result backend.') |
| 286 | |
| 287 | redis_connection = get_redis_connection() |
| 288 | redis_connection.delete('redis-echo') |
| 289 | |
| 290 | result = second_order_replace1.delay() |
| 291 | result.get(timeout=TIMEOUT) |
| 292 | redis_messages = list(redis_connection.lrange('redis-echo', 0, -1)) |
| 293 | |
| 294 | expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', |
| 295 | b'Out A'] |
| 296 | assert redis_messages == expected_messages |
| 297 | |
| 298 | @flaky |
| 299 | def test_parent_ids(self, manager, num=10): |
nothing calls this directly
no test coverage detected