Helper method to wait for a message with timeout. Args: pubsub: The PubSub instance timeout: Timeout in seconds ignore_subscribe_messages: Whether to ignore subscribe messages sharded: If True, use get_sharded_message() instead of get_message(
(
self, pubsub, timeout=0.2, ignore_subscribe_messages=False, sharded=False
)
| 4690 | """ |
| 4691 | |
| 4692 | async def wait_for_message( |
| 4693 | self, pubsub, timeout=0.2, ignore_subscribe_messages=False, sharded=False |
| 4694 | ): |
| 4695 | """Helper method to wait for a message with timeout. |
| 4696 | |
| 4697 | Args: |
| 4698 | pubsub: The PubSub instance |
| 4699 | timeout: Timeout in seconds |
| 4700 | ignore_subscribe_messages: Whether to ignore subscribe messages |
| 4701 | sharded: If True, use get_sharded_message() instead of get_message() |
| 4702 | """ |
| 4703 | import asyncio |
| 4704 | |
| 4705 | now = asyncio.get_running_loop().time() |
| 4706 | end_time = now + timeout |
| 4707 | while now < end_time: |
| 4708 | if sharded: |
| 4709 | message = await pubsub.get_sharded_message( |
| 4710 | ignore_subscribe_messages=ignore_subscribe_messages, |
| 4711 | timeout=0.01, |
| 4712 | ) |
| 4713 | else: |
| 4714 | message = await pubsub.get_message( |
| 4715 | ignore_subscribe_messages=ignore_subscribe_messages |
| 4716 | ) |
| 4717 | if message is not None: |
| 4718 | return message |
| 4719 | await asyncio.sleep(0.01) |
| 4720 | now = asyncio.get_running_loop().time() |
| 4721 | return None |
| 4722 | |
| 4723 | def make_message(self, type, channel, data, pattern=None): |
| 4724 | """Helper method to create expected message format""" |
no test coverage detected