Execute a publish/subscribe command
(self, *args)
| 1074 | return self.subscribed_event.is_set() |
| 1075 | |
| 1076 | def execute_command(self, *args): |
| 1077 | """Execute a publish/subscribe command""" |
| 1078 | |
| 1079 | # NOTE: don't parse the response in this function -- it could pull a |
| 1080 | # legitimate message off the stack if the connection is already |
| 1081 | # subscribed to one or more channels |
| 1082 | |
| 1083 | if self.connection is None: |
| 1084 | self.connection = self.connection_pool.get_connection() |
| 1085 | # register a callback that re-subscribes to any channels we |
| 1086 | # were listening to when we were disconnected |
| 1087 | self.connection.register_connect_callback(self.on_connect) |
| 1088 | if self.push_handler_func is not None: |
| 1089 | self.connection._parser.set_pubsub_push_handler(self.push_handler_func) |
| 1090 | self._event_dispatcher.dispatch( |
| 1091 | AfterPubSubConnectionInstantiationEvent( |
| 1092 | self.connection, self.connection_pool, ClientType.SYNC, self._lock |
| 1093 | ) |
| 1094 | ) |
| 1095 | connection = self.connection |
| 1096 | kwargs = {"check_health": not self.subscribed} |
| 1097 | if not self.subscribed: |
| 1098 | self.clean_health_check_responses() |
| 1099 | with self._lock: |
| 1100 | self._execute(connection, connection.send_command, *args, **kwargs) |
| 1101 | |
| 1102 | def clean_health_check_responses(self) -> None: |
| 1103 | """ |
no test coverage detected