MCPcopy
hub / github.com/celery/celery / test_second_order_replace

Method test_second_order_replace

t/integration/test_canvas.py:283–296  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

281
282 @flaky
283 def test_second_order_replace(self, manager):
284 if not manager.app.conf.result_backend.startswith('redis'):
285 raise pytest.skip('Requires redis result backend.')
286
287 redis_connection = get_redis_connection()
288 redis_connection.delete('redis-echo')
289
290 result = second_order_replace1.delay()
291 result.get(timeout=TIMEOUT)
292 redis_messages = list(redis_connection.lrange('redis-echo', 0, -1))
293
294 expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B',
295 b'Out A']
296 assert redis_messages == expected_messages
297
298 @flaky
299 def test_parent_ids(self, manager, num=10):

Callers

nothing calls this directly

Calls 5

get_redis_connectionFunction · 0.85
lrangeMethod · 0.80
deleteMethod · 0.45
delayMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected