(
self,
*,
yaml_file: str,
task_prompt: str,
session_id: str,
attachments: Optional[list[str]] = None,
)
| 65 | self.logger = logging.getLogger(__name__) |
| 66 | |
| 67 | def create_session( |
| 68 | self, |
| 69 | *, |
| 70 | yaml_file: str, |
| 71 | task_prompt: str, |
| 72 | session_id: str, |
| 73 | attachments: Optional[list[str]] = None, |
| 74 | ) -> WorkflowSession: |
| 75 | session = WorkflowSession( |
| 76 | session_id=session_id, |
| 77 | yaml_file=yaml_file, |
| 78 | task_prompt=task_prompt, |
| 79 | task_attachments=list(attachments or []), |
| 80 | ) |
| 81 | self._sessions[session_id] = session |
| 82 | self.logger.info("Created session %s for workflow %s", session_id, yaml_file) |
| 83 | return session |
| 84 | |
| 85 | def get_session(self, session_id: str) -> Optional[WorkflowSession]: |
| 86 | return self._sessions.get(session_id) |
no test coverage detected