Parses a pub/sub message. If the channel or pattern was subscribed to with a message handler, the handler is invoked instead of a parsed message being returned.
(self, response, ignore_subscribe_messages=False)
| 1493 | return self.execute_command(*args) |
| 1494 | |
| 1495 | def handle_message(self, response, ignore_subscribe_messages=False): |
| 1496 | """ |
| 1497 | Parses a pub/sub message. If the channel or pattern was subscribed to |
| 1498 | with a message handler, the handler is invoked instead of a parsed |
| 1499 | message being returned. |
| 1500 | """ |
| 1501 | if response is None: |
| 1502 | return None |
| 1503 | if isinstance(response, bytes): |
| 1504 | response = [b"pong", response] if response != b"PONG" else [b"pong", b""] |
| 1505 | |
| 1506 | message_type = str_if_bytes(response[0]) |
| 1507 | if message_type == "pmessage": |
| 1508 | message = { |
| 1509 | "type": message_type, |
| 1510 | "pattern": response[1], |
| 1511 | "channel": response[2], |
| 1512 | "data": response[3], |
| 1513 | } |
| 1514 | elif message_type == "pong": |
| 1515 | message = { |
| 1516 | "type": message_type, |
| 1517 | "pattern": None, |
| 1518 | "channel": None, |
| 1519 | "data": response[1], |
| 1520 | } |
| 1521 | else: |
| 1522 | message = { |
| 1523 | "type": message_type, |
| 1524 | "pattern": None, |
| 1525 | "channel": response[1], |
| 1526 | "data": response[2], |
| 1527 | } |
| 1528 | |
| 1529 | if message_type in ["message", "pmessage"]: |
| 1530 | channel = str_if_bytes(message["channel"]) |
| 1531 | record_pubsub_message( |
| 1532 | direction=PubSubDirection.RECEIVE, |
| 1533 | channel=channel, |
| 1534 | ) |
| 1535 | elif message_type == "smessage": |
| 1536 | channel = str_if_bytes(message["channel"]) |
| 1537 | record_pubsub_message( |
| 1538 | direction=PubSubDirection.RECEIVE, |
| 1539 | channel=channel, |
| 1540 | sharded=True, |
| 1541 | ) |
| 1542 | |
| 1543 | # if this is an unsubscribe message, remove it from memory |
| 1544 | if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: |
| 1545 | if message_type == "punsubscribe": |
| 1546 | pattern = response[1] |
| 1547 | if pattern in self.pending_unsubscribe_patterns: |
| 1548 | self.pending_unsubscribe_patterns.remove(pattern) |
| 1549 | self.patterns.pop(pattern, None) |
| 1550 | elif message_type == "sunsubscribe": |
| 1551 | s_channel = response[1] |
| 1552 | if s_channel in self.pending_unsubscribe_shard_channels: |