Retrieve and process the command policies for all commands and subcommands. This method traverses through commands and subcommands, extracting policy details from associated data structures and constructing a dictionary of commands with their associated policies. It
(self)
| 550 | raise ValueError(f"Command {command_name} not found in commands") |
| 551 | |
| 552 | async def get_command_policies(self) -> Awaitable[PolicyRecords]: |
| 553 | """ |
| 554 | Retrieve and process the command policies for all commands and subcommands. |
| 555 | |
| 556 | This method traverses through commands and subcommands, extracting policy details |
| 557 | from associated data structures and constructing a dictionary of commands with their |
| 558 | associated policies. It supports nested data structures and handles both main commands |
| 559 | and their subcommands. |
| 560 | |
| 561 | Returns: |
| 562 | PolicyRecords: A collection of commands and subcommands associated with their |
| 563 | respective policies. |
| 564 | |
| 565 | Raises: |
| 566 | IncorrectPolicyType: If an invalid policy type is encountered during policy extraction. |
| 567 | """ |
| 568 | command_with_policies = {} |
| 569 | |
| 570 | def extract_policies(data, module_name, command_name): |
| 571 | """ |
| 572 | Recursively extract policies from nested data structures. |
| 573 | |
| 574 | Args: |
| 575 | data: The data structure to search (can be list, dict, str, bytes, etc.) |
| 576 | command_name: The command name to associate with found policies |
| 577 | """ |
| 578 | if isinstance(data, (str, bytes)): |
| 579 | # Decode bytes to string if needed |
| 580 | policy = str_if_bytes(data.decode()) |
| 581 | |
| 582 | # Check if this is a policy string |
| 583 | if policy.startswith("request_policy") or policy.startswith( |
| 584 | "response_policy" |
| 585 | ): |
| 586 | if policy.startswith("request_policy"): |
| 587 | policy_type = policy.split(":")[1] |
| 588 | |
| 589 | try: |
| 590 | command_with_policies[module_name][ |
| 591 | command_name |
| 592 | ].request_policy = RequestPolicy(policy_type) |
| 593 | except ValueError: |
| 594 | raise IncorrectPolicyType( |
| 595 | f"Incorrect request policy type: {policy_type}" |
| 596 | ) |
| 597 | |
| 598 | if policy.startswith("response_policy"): |
| 599 | policy_type = policy.split(":")[1] |
| 600 | |
| 601 | try: |
| 602 | command_with_policies[module_name][ |
| 603 | command_name |
| 604 | ].response_policy = ResponsePolicy(policy_type) |
| 605 | except ValueError: |
| 606 | raise IncorrectPolicyType( |
| 607 | f"Incorrect response policy type: {policy_type}" |
| 608 | ) |
| 609 |