(
self,
session_id: str,
yaml_file: str,
task: BatchTask,
task_dir: str,
log_level: Optional[LogLevel],
)
| 190 | json.dump(result_rows, handle, ensure_ascii=True, indent=2) |
| 191 | |
| 192 | def _run_single_task( |
| 193 | self, |
| 194 | session_id: str, |
| 195 | yaml_file: str, |
| 196 | task: BatchTask, |
| 197 | task_dir: str, |
| 198 | log_level: Optional[LogLevel], |
| 199 | ) -> Dict[str, Any]: |
| 200 | yaml_path = self._resolve_yaml_path(yaml_file) |
| 201 | design = load_config(yaml_path, vars_override=task.vars_override or None) |
| 202 | if any(node.type == "human" for node in design.graph.nodes): |
| 203 | raise ValidationError( |
| 204 | "Batch execution does not support human nodes", |
| 205 | details={"yaml_file": yaml_file}, |
| 206 | ) |
| 207 | |
| 208 | output_root = WARE_HOUSE_DIR / f"session_{session_id}" |
| 209 | graph_config = GraphConfig.from_definition( |
| 210 | design.graph, |
| 211 | name=task_dir, |
| 212 | output_root=output_root, |
| 213 | source_path=str(yaml_path), |
| 214 | vars=design.vars, |
| 215 | ) |
| 216 | graph_config.metadata["fixed_output_dir"] = True |
| 217 | |
| 218 | if log_level: |
| 219 | graph_config.log_level = log_level |
| 220 | graph_config.definition.log_level = log_level |
| 221 | |
| 222 | graph_context = GraphContext(config=graph_config) |
| 223 | |
| 224 | start_time = time.perf_counter() |
| 225 | executor = GraphExecutor(graph_context, session_id=session_id) |
| 226 | task_input = self._build_task_input(executor.attachment_store, task) |
| 227 | executor._execute(task_input) |
| 228 | duration_ms = int((time.perf_counter() - start_time) * 1000) |
| 229 | |
| 230 | return { |
| 231 | "results": executor.outputs, |
| 232 | "token_usage": executor.token_tracker.get_token_usage(), |
| 233 | "duration_ms": duration_ms, |
| 234 | "graph_output": executor.get_final_output(), |
| 235 | } |
| 236 | |
| 237 | @staticmethod |
| 238 | def _build_task_input(attachment_store, task: BatchTask): |
nothing calls this directly
no test coverage detected