MCPcopy
hub / github.com/OpenBMB/ChatDev / _run_workflow_with_logger

Function _run_workflow_with_logger

server/routes/execute_sync.py:69–144  ·  view source on GitHub ↗
(
    *,
    yaml_file: Union[str, Path],
    task_prompt: str,
    attachments: Optional[Sequence[Union[str, Path]]],
    session_name: Optional[str],
    variables: Optional[dict],
    log_level: Optional[LogLevel],
    log_callback,
)

Source from the content-addressed store, hash-verified

67
68
69def _run_workflow_with_logger(
70 *,
71 yaml_file: Union[str, Path],
72 task_prompt: str,
73 attachments: Optional[Sequence[Union[str, Path]]],
74 session_name: Optional[str],
75 variables: Optional[dict],
76 log_level: Optional[LogLevel],
77 log_callback,
78) -> tuple[Optional[Message], dict[str, Any]]:
79 ensure_schema_registry_populated()
80
81 yaml_path = _resolve_yaml_path(yaml_file)
82 if not yaml_path.exists():
83 raise FileNotFoundError(f"YAML file not found: {yaml_path}")
84
85 attachments = attachments or []
86 if (not task_prompt or not task_prompt.strip()) and not attachments:
87 raise ValidationError(
88 "Task prompt cannot be empty",
89 details={"task_prompt_provided": bool(task_prompt)},
90 )
91
92 design = load_config(yaml_path, vars_override=variables)
93 normalized_session = _normalize_session_name(yaml_path, session_name)
94
95 graph_config = GraphConfig.from_definition(
96 design.graph,
97 name=normalized_session,
98 output_root=OUTPUT_ROOT,
99 source_path=str(yaml_path),
100 vars=design.vars,
101 )
102
103 if log_level:
104 graph_config.log_level = log_level
105 graph_config.definition.log_level = log_level
106
107 graph_context = GraphContext(config=graph_config)
108 task_input = _build_task_input(graph_context, task_prompt, attachments)
109
110 class _StreamingWorkflowLogger(WorkflowLogger):
111 def add_log(self, *args, **kwargs):
112 entry = super().add_log(*args, **kwargs)
113 if entry:
114 payload = entry.to_dict()
115 payload.pop("details", None)
116 log_callback("log", payload)
117 return entry
118
119 class _StreamingExecutor(GraphExecutor):
120 def _create_logger(self) -> WorkflowLogger:
121 level = log_level or self.graph.log_level
122 return _StreamingWorkflowLogger(
123 self.graph.name,
124 level,
125 use_structured_logging=True,
126 log_to_console=False,

Callers 1

workerFunction · 0.85

Calls 13

ValidationErrorClass · 0.90
load_configFunction · 0.90
GraphContextClass · 0.90
_StreamingExecutorClass · 0.85
from_definitionMethod · 0.80
get_loggerMethod · 0.80
get_token_usageMethod · 0.80
_resolve_yaml_pathFunction · 0.70
_normalize_session_nameFunction · 0.70
_build_task_inputFunction · 0.70

Tested by

no test coverage detected