MCPcopy
hub / github.com/redis/redis-py / get_command_policies

Method get_command_policies

redis/_parsers/commands.py:255–395  ·  view source on GitHub ↗

Retrieve and process the command policies for all commands and subcommands. This method traverses through commands and subcommands, extracting policy details from associated data structures and constructing a dictionary of commands with their associated policies. It

(self)

Source from the content-addressed store, hash-verified

253 raise ValueError(f"Command {command_name} not found in commands")
254
255 def get_command_policies(self) -> PolicyRecords:
256 """
257 Retrieve and process the command policies for all commands and subcommands.
258
259 This method traverses through commands and subcommands, extracting policy details
260 from associated data structures and constructing a dictionary of commands with their
261 associated policies. It supports nested data structures and handles both main commands
262 and their subcommands.
263
264 Returns:
265 PolicyRecords: A collection of commands and subcommands associated with their
266 respective policies.
267
268 Raises:
269 IncorrectPolicyType: If an invalid policy type is encountered during policy extraction.
270 """
271 command_with_policies = {}
272
273 def extract_policies(data, module_name, command_name):
274 """
275 Recursively extract policies from nested data structures.
276
277 Args:
278 data: The data structure to search (can be list, dict, str, bytes, etc.)
279 command_name: The command name to associate with found policies
280 """
281 if isinstance(data, (str, bytes)):
282 # Decode bytes to string if needed
283 policy = str_if_bytes(data.decode())
284
285 # Check if this is a policy string
286 if policy.startswith("request_policy") or policy.startswith(
287 "response_policy"
288 ):
289 if policy.startswith("request_policy"):
290 policy_type = policy.split(":")[1]
291
292 try:
293 command_with_policies[module_name][
294 command_name
295 ].request_policy = RequestPolicy(policy_type)
296 except ValueError:
297 raise IncorrectPolicyType(
298 f"Incorrect request policy type: {policy_type}"
299 )
300
301 if policy.startswith("response_policy"):
302 policy_type = policy.split(":")[1]
303
304 try:
305 command_with_policies[module_name][
306 command_name
307 ].response_policy = ResponsePolicy(policy_type)
308 except ValueError:
309 raise IncorrectPolicyType(
310 f"Incorrect response policy type: {policy_type}"
311 )
312

Calls 4

_is_keyless_commandMethod · 0.95
CommandPoliciesClass · 0.85
decodeMethod · 0.80
getMethod · 0.45