MCPcopy
hub / github.com/opentrace/opentrace / ToolDispatcher

Class ToolDispatcher

agent/src/opentrace_agent/benchmarks/agent.py:239–304  ·  view source on GitHub ↗

Routes Claude tool calls to the appropriate handler.

Source from the content-addressed store, hash-verified

237
238
239class ToolDispatcher:
240 """Routes Claude tool calls to the appropriate handler."""
241
242 def __init__(self, repo_path: Path, mcp_tools: dict[str, Any] | None = None) -> None:
243 self._repo_path = repo_path.resolve()
244 self._mcp_tools = mcp_tools # MCP tool functions from GraphAccuracyBenchmark style
245 self._patch: str | None = None
246
247 @property
248 def patch(self) -> str | None:
249 return self._patch
250
251 def dispatch(self, name: str, input_args: dict[str, Any]) -> str:
252 """Execute a tool and return the result as a string."""
253 try:
254 if name == "read_file":
255 return self._read_file(input_args["path"])
256 elif name == "list_directory":
257 return self._list_directory(input_args.get("path", "."))
258 elif name == "generate_patch":
259 self._patch = input_args["patch"]
260 return "Patch submitted successfully."
261 elif name in ("search_graph", "list_nodes", "get_node", "traverse_graph", "get_stats"):
262 return self._call_mcp(name, input_args)
263 else:
264 return json.dumps({"error": f"Unknown tool: {name}"})
265 except Exception as e:
266 return json.dumps({"error": f"{type(e).__name__}: {e}"})
267
268 def _read_file(self, rel_path: str) -> str:
269 target = (self._repo_path / rel_path).resolve()
270 # Security: prevent path traversal
271 if not str(target).startswith(str(self._repo_path)):
272 return json.dumps({"error": "Path traversal not allowed"})
273 if not target.is_file():
274 return json.dumps({"error": f"File not found: {rel_path}"})
275 try:
276 content = target.read_text(errors="replace")
277 # Truncate very large files
278 if len(content) > 50_000:
279 content = content[:50_000] + "\n...[truncated at 50000 chars]"
280 return content
281 except Exception as e:
282 return json.dumps({"error": f"Read error: {e}"})
283
284 def _list_directory(self, rel_path: str) -> str:
285 target = (self._repo_path / rel_path).resolve()
286 if not str(target).startswith(str(self._repo_path)):
287 return json.dumps({"error": "Path traversal not allowed"})
288 if not target.is_dir():
289 return json.dumps({"error": f"Directory not found: {rel_path}"})
290 entries = []
291 for child in sorted(target.iterdir()):
292 if child.name.startswith("."):
293 continue
294 kind = "dir" if child.is_dir() else "file"
295 entries.append({"name": child.name, "type": kind})
296 return json.dumps(entries)

Callers 11

test_read_fileMethod · 0.90
test_list_directoryMethod · 0.90
test_generate_patchMethod · 0.90
test_unknown_toolMethod · 0.90
agent_fnFunction · 0.85

Calls

no outgoing calls

Tested by 10

test_read_fileMethod · 0.72
test_list_directoryMethod · 0.72
test_generate_patchMethod · 0.72
test_unknown_toolMethod · 0.72