MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / request

Method request

utils/human_prompt.py:81–119  ·  view source on GitHub ↗

Request human input through the configured channel.

(
        self,
        node_id: str,
        task_description: str,
        *,
        inputs: Optional[str] = None,
        metadata: Optional[Dict[str, Any]] = None,
    )

Source from the content-addressed store, hash-verified

79 self._lock = threading.Lock()
80
81 def request(
82 self,
83 node_id: str,
84 task_description: str,
85 *,
86 inputs: Optional[str] = None,
87 metadata: Optional[Dict[str, Any]] = None,
88 ) -> PromptResult:
89 """Request human input through the configured channel."""
90
91 meta = dict(metadata or {})
92 if self._session_id and "session_id" not in meta:
93 meta["session_id"] = self._session_id
94
95 with self._lock:
96 with self._log_manager.human_timer(node_id):
97 raw_result = self._channel.request(
98 node_id=node_id,
99 task=task_description,
100 inputs=inputs,
101 metadata=meta,
102 )
103
104 prompt_result = self._normalize_result(raw_result)
105 sanitized_text = self._sanitize_response(prompt_result.text)
106 normalized_blocks = self._normalize_blocks(prompt_result.blocks, sanitized_text)
107 combined_metadata = {**prompt_result.metadata, **meta}
108
109 self._log_manager.record_human_interaction(
110 node_id,
111 inputs,
112 sanitized_text,
113 details={"task_description": task_description, **combined_metadata},
114 )
115 return PromptResult(
116 text=sanitized_text,
117 blocks=normalized_blocks,
118 metadata=combined_metadata,
119 )
120
121 @staticmethod
122 def _sanitize_response(response: Any) -> str:

Callers

nothing calls this directly

Calls 7

_normalize_resultMethod · 0.95
_sanitize_responseMethod · 0.95
_normalize_blocksMethod · 0.95
PromptResultClass · 0.85
human_timerMethod · 0.45
requestMethod · 0.45

Tested by

no test coverage detected