(self, *args, **kwargs)
| 4151 | ) as execute_command_mock: |
| 4152 | |
| 4153 | async def execute_command(self, *args, **kwargs): |
| 4154 | if args[0] == "INFO": |
| 4155 | return {"cluster_enabled": True} |
| 4156 | if args[0] == "CLUSTER SLOTS": |
| 4157 | return [[0, 16383, [ssl_host, ssl_port, "ssl_node"]]] |
| 4158 | if args[0] == "COMMAND": |
| 4159 | return { |
| 4160 | "ping": { |
| 4161 | "name": "ping", |
| 4162 | "arity": -1, |
| 4163 | "flags": ["stale", "fast"], |
| 4164 | "first_key_pos": 0, |
| 4165 | "last_key_pos": 0, |
| 4166 | "step_count": 0, |
| 4167 | } |
| 4168 | } |
| 4169 | raise NotImplementedError() |
| 4170 | |
| 4171 | execute_command_mock.side_effect = execute_command |
| 4172 |
no outgoing calls
no test coverage detected