MCPcopy
hub / github.com/redis/redis-py / wait_for_message

Function wait_for_message

tests/test_asyncio/test_pubsub.py:46–57  ·  view source on GitHub ↗
(pubsub, timeout=0.2, ignore_subscribe_messages=False)

Source from the content-addressed store, hash-verified

44
45
46async def wait_for_message(pubsub, timeout=0.2, ignore_subscribe_messages=False):
47 now = asyncio.get_running_loop().time()
48 timeout = now + timeout
49 while now < timeout:
50 message = await pubsub.get_message(
51 ignore_subscribe_messages=ignore_subscribe_messages
52 )
53 if message is not None:
54 return message
55 await asyncio.sleep(0.01)
56 now = asyncio.get_running_loop().time()
57 return None
58
59
60def make_message(

Calls 4

get_running_loopMethod · 0.80
timeMethod · 0.45
get_messageMethod · 0.45
sleepMethod · 0.45

Tested by

no test coverage detected