MCPcopy
hub / github.com/celery/celery / await_redis_echo

Function await_redis_echo

t/integration/test_canvas.py:30–56  ·  view source on GitHub ↗

Helper to wait for a specified or well-known redis key to contain a string.

(expected_msgs, redis_key="redis-echo", timeout=TIMEOUT)

Source from the content-addressed store, hash-verified

28
29
30def await_redis_echo(expected_msgs, redis_key="redis-echo", timeout=TIMEOUT):
31 """
32 Helper to wait for a specified or well-known redis key to contain a string.
33 """
34 redis_connection = get_redis_connection()
35
36 if isinstance(expected_msgs, (str, bytes, bytearray)):
37 expected_msgs = (expected_msgs,)
38 expected_msgs = collections.Counter(
39 e if not isinstance(e, str) else e.encode("utf-8")
40 for e in expected_msgs
41 )
42
43 # This can technically wait for `len(expected_msg_or_msgs) * timeout` :/
44 while +expected_msgs:
45 maybe_key_msg = redis_connection.blpop(redis_key, timeout)
46 if maybe_key_msg is None:
47 raise TimeoutError(
48 "Fetching from {!r} timed out - still awaiting {!r}"
49 .format(redis_key, dict(+expected_msgs))
50 )
51 retrieved_key, msg = maybe_key_msg
52 assert retrieved_key.decode("utf-8") == redis_key
53 expected_msgs[msg] -= 1 # silently accepts unexpected messages
54
55 # There should be no more elements - block momentarily
56 assert redis_connection.blpop(redis_key, min(1, timeout)) is None
57
58
59def await_redis_list_message_length(expected_length, redis_key="redis-group-ids", timeout=TIMEOUT):

Calls 5

TimeoutErrorClass · 0.90
get_redis_connectionFunction · 0.85
encodeMethod · 0.45
formatMethod · 0.45
decodeMethod · 0.45

Tested by

no test coverage detected