Construct the initial task input, embedding attachments when available.
(
graph_context: GraphContext,
prompt: str,
attachment_paths: List[str]
)
| 20 | ensure_schema_registry_populated() |
| 21 | |
| 22 | def build_task_input_payload( |
| 23 | graph_context: GraphContext, |
| 24 | prompt: str, |
| 25 | attachment_paths: List[str] |
| 26 | ) -> Union[str, List[Message]]: |
| 27 | """Construct the initial task input, embedding attachments when available.""" |
| 28 | if not attachment_paths: |
| 29 | return prompt |
| 30 | |
| 31 | code_workspace = graph_context.directory / "code_workspace" |
| 32 | attachments_dir = code_workspace / "attachments" |
| 33 | attachments_dir.mkdir(parents=True, exist_ok=True) |
| 34 | store = AttachmentStore(attachments_dir) |
| 35 | builder = TaskInputBuilder(store) |
| 36 | return builder.build_from_file_paths(prompt, attachment_paths) |
| 37 | |
| 38 | def parse_arguments(): |
| 39 | parser = argparse.ArgumentParser(description="Run ChatDev_new workflow") |
no test coverage detected