(
pubsub, timeout=0.5, ignore_subscribe_messages=False, node=None, func=None
)
| 30 | |
| 31 | |
| 32 | def wait_for_message( |
| 33 | pubsub, timeout=0.5, ignore_subscribe_messages=False, node=None, func=None |
| 34 | ): |
| 35 | now = time.monotonic() |
| 36 | timeout = now + timeout |
| 37 | while now < timeout: |
| 38 | if node: |
| 39 | message = pubsub.get_sharded_message( |
| 40 | ignore_subscribe_messages=ignore_subscribe_messages, target_node=node |
| 41 | ) |
| 42 | elif func: |
| 43 | message = func(ignore_subscribe_messages=ignore_subscribe_messages) |
| 44 | else: |
| 45 | message = pubsub.get_message( |
| 46 | ignore_subscribe_messages=ignore_subscribe_messages |
| 47 | ) |
| 48 | if message is not None: |
| 49 | return message |
| 50 | time.sleep(0.01) |
| 51 | now = time.monotonic() |
| 52 | return None |
| 53 | |
| 54 | |
| 55 | def make_message(type, channel, data, pattern=None): |
no test coverage detected