MCPcopy
hub / github.com/1Panel-dev/MaxKB / get_child_tool_list

Method get_child_tool_list

apps/tools/serializers/tool.py:768–805  ·  view source on GitHub ↗
(self, work_flow, response)

Source from the content-addressed store, hash-verified

766 }
767
768 def get_child_tool_list(self, work_flow, response):
769 from application.flow.tools import get_tool_id_list
770
771 tool_id_list = get_tool_id_list(work_flow, False)
772 tool_id_list = [
773 tool_id for tool_id in tool_id_list if len([r for r in response if r.get("id") == tool_id]) == 0
774 ]
775 tool_list = []
776 if len(tool_id_list) > 0:
777 tool_list = QuerySet(Tool).filter(id__in=tool_id_list).exclude(scope=ToolScope.SHARED)
778 work_flow_tools = [tool for tool in tool_list if tool.tool_type == ToolType.WORKFLOW]
779 if len(work_flow_tools) > 0:
780 work_flow_tool_dict = {
781 tw.tool_id: tw
782 for tw in QuerySet(ToolWorkflow).filter(tool_id__in=[t.id for t in work_flow_tools])
783 }
784 for tool in tool_list:
785 if tool.tool_type == ToolType.WORKFLOW:
786 response.append(
787 {
788 **ToolExportModelSerializer(tool).data,
789 "work_flow": work_flow_tool_dict.get(tool.id).work_flow,
790 }
791 )
792 self.get_child_tool_list(work_flow_tool_dict.get(tool.id).work_flow, response)
793 else:
794 response.append(ToolExportModelSerializer(tool).data)
795 else:
796 for tool in tool_list:
797 response.append(ToolExportModelSerializer(tool).data)
798 skill_tools = [tool for tool in tool_list if tool.tool_type == ToolType.SKILL]
799 for tool in skill_tools:
800 skill_file = QuerySet(File).filter(id=tool.code).first()
801 if skill_file:
802 tool.code = base64.b64encode(skill_file.get_bytes()).decode("utf-8")
803 response.append(ToolExportModelSerializer(tool).data)
804
805 return response
806
807 def export(self):
808 try:

Callers 1

exportMethod · 0.95

Calls 6

get_tool_id_listFunction · 0.90
decodeMethod · 0.80
get_bytesMethod · 0.80
getMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected