Helper to wait for a specified or well-known redis key to contain a string.
(expected_msgs, redis_key="redis-echo", timeout=TIMEOUT)
| 28 | |
| 29 | |
| 30 | def await_redis_echo(expected_msgs, redis_key="redis-echo", timeout=TIMEOUT): |
| 31 | """ |
| 32 | Helper to wait for a specified or well-known redis key to contain a string. |
| 33 | """ |
| 34 | redis_connection = get_redis_connection() |
| 35 | |
| 36 | if isinstance(expected_msgs, (str, bytes, bytearray)): |
| 37 | expected_msgs = (expected_msgs,) |
| 38 | expected_msgs = collections.Counter( |
| 39 | e if not isinstance(e, str) else e.encode("utf-8") |
| 40 | for e in expected_msgs |
| 41 | ) |
| 42 | |
| 43 | # This can technically wait for `len(expected_msg_or_msgs) * timeout` :/ |
| 44 | while +expected_msgs: |
| 45 | maybe_key_msg = redis_connection.blpop(redis_key, timeout) |
| 46 | if maybe_key_msg is None: |
| 47 | raise TimeoutError( |
| 48 | "Fetching from {!r} timed out - still awaiting {!r}" |
| 49 | .format(redis_key, dict(+expected_msgs)) |
| 50 | ) |
| 51 | retrieved_key, msg = maybe_key_msg |
| 52 | assert retrieved_key.decode("utf-8") == redis_key |
| 53 | expected_msgs[msg] -= 1 # silently accepts unexpected messages |
| 54 | |
| 55 | # There should be no more elements - block momentarily |
| 56 | assert redis_connection.blpop(redis_key, min(1, timeout)) is None |
| 57 | |
| 58 | |
| 59 | def await_redis_list_message_length(expected_length, redis_key="redis-group-ids", timeout=TIMEOUT): |
no test coverage detected