Process pub/sub messages using registered callbacks. This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in redis-py, but it is a coroutine. To launch it as a separate task, use ``asyncio.create_task``: >>> task = asyncio.create_task(pubsub.run())
(
self,
*,
exception_handler: Optional["PSWorkerThreadExcHandlerT"] = None,
poll_timeout: float = 1.0,
pubsub=None,
)
| 1565 | return message |
| 1566 | |
| 1567 | async def run( |
| 1568 | self, |
| 1569 | *, |
| 1570 | exception_handler: Optional["PSWorkerThreadExcHandlerT"] = None, |
| 1571 | poll_timeout: float = 1.0, |
| 1572 | pubsub=None, |
| 1573 | ) -> None: |
| 1574 | """Process pub/sub messages using registered callbacks. |
| 1575 | |
| 1576 | This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in |
| 1577 | redis-py, but it is a coroutine. To launch it as a separate task, use |
| 1578 | ``asyncio.create_task``: |
| 1579 | |
| 1580 | >>> task = asyncio.create_task(pubsub.run()) |
| 1581 | |
| 1582 | To shut it down, use asyncio cancellation: |
| 1583 | |
| 1584 | >>> task.cancel() |
| 1585 | >>> await task |
| 1586 | """ |
| 1587 | for channel, handler in self.channels.items(): |
| 1588 | if handler is None: |
| 1589 | raise PubSubError(f"Channel: '{channel}' has no handler registered") |
| 1590 | for pattern, handler in self.patterns.items(): |
| 1591 | if handler is None: |
| 1592 | raise PubSubError(f"Pattern: '{pattern}' has no handler registered") |
| 1593 | |
| 1594 | await self.connect() |
| 1595 | while True: |
| 1596 | try: |
| 1597 | if pubsub is None: |
| 1598 | await self.get_message( |
| 1599 | ignore_subscribe_messages=True, timeout=poll_timeout |
| 1600 | ) |
| 1601 | else: |
| 1602 | await pubsub.get_message( |
| 1603 | ignore_subscribe_messages=True, timeout=poll_timeout |
| 1604 | ) |
| 1605 | except asyncio.CancelledError: |
| 1606 | raise |
| 1607 | except BaseException as e: |
| 1608 | if exception_handler is None: |
| 1609 | raise |
| 1610 | res = exception_handler(e, self) |
| 1611 | if inspect.isawaitable(res): |
| 1612 | await res |
| 1613 | # Ensure that other tasks on the event loop get a chance to run |
| 1614 | # if we didn't have to block for I/O anywhere. |
| 1615 | await asyncio.sleep(0) |
| 1616 | |
| 1617 | |
| 1618 | class PubsubWorkerExceptionHandler(Protocol): |
no test coverage detected