MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / _write_batch_outputs

Method _write_batch_outputs

server/services/batch_run_service.py:162–190  ·  view source on GitHub ↗
(self, session_id: str, result_rows: List[Dict[str, Any]])

Source from the content-addressed store, hash-verified

160 )
161
162 def _write_batch_outputs(self, session_id: str, result_rows: List[Dict[str, Any]]) -> None:
163 output_root = WARE_HOUSE_DIR / f"session_{session_id}"
164 output_root.mkdir(parents=True, exist_ok=True)
165
166 csv_path = output_root / "batch_results.csv"
167 json_path = output_root / "batch_manifest.json"
168
169 fieldnames = [
170 "row_index",
171 "task_id",
172 "task_dir",
173 "status",
174 "duration_ms",
175 "token_usage",
176 "results",
177 "error",
178 ]
179
180 with csv_path.open("w", newline="", encoding="utf-8") as handle:
181 writer = csv.DictWriter(handle, fieldnames=fieldnames, extrasaction="ignore")
182 writer.writeheader()
183 for row in result_rows:
184 row_copy = dict(row)
185 row_copy["token_usage"] = json.dumps(row_copy.get("token_usage"))
186 row_copy["results"] = row_copy.get("graph_output", "")
187 writer.writerow(row_copy)
188
189 with json_path.open("w", encoding="utf-8") as handle:
190 json.dump(result_rows, handle, ensure_ascii=True, indent=2)
191
192 def _run_single_task(
193 self,

Callers 1

run_batchMethod · 0.95

Calls 1

getMethod · 0.45

Tested by

no test coverage detected