MCPcopy
hub / github.com/AgentOps-AI/agentops / upload_object

Method upload_object

agentops/client/api/versions/v4.py:71–106  ·  view source on GitHub ↗

Upload an object to the V4 API. Args: body: The object body to upload Returns: Dictionary containing upload response data Raises: ApiServerException: If the upload fails

(self, body: Union[str, bytes])

Source from the content-addressed store, hash-verified

69 return HttpClient.get_session().post(url, json={"body": body}, headers=request_headers, timeout=30)
70
71 def upload_object(self, body: Union[str, bytes]) -> Dict[str, Any]:
72 """
73 Upload an object to the V4 API.
74
75 Args:
76 body: The object body to upload
77
78 Returns:
79 Dictionary containing upload response data
80
81 Raises:
82 ApiServerException: If the upload fails
83 """
84 try:
85 # Convert bytes to string for consistency with test expectations
86 if isinstance(body, bytes):
87 body = body.decode("utf-8")
88
89 response = self.post("/v4/objects/upload/", body, self.prepare_headers())
90
91 if response.status_code != 200:
92 error_msg = f"Upload failed: {response.status_code}"
93 try:
94 error_data = response.json()
95 if "error" in error_data:
96 error_msg = error_data["error"]
97 except:
98 pass
99 raise ApiServerException(error_msg)
100
101 try:
102 return response.json()
103 except Exception as e:
104 raise ApiServerException(f"Failed to process upload response: {str(e)}")
105 except requests.exceptions.RequestException as e:
106 raise ApiServerException(f"Failed to upload object: {e}")
107
108 def upload_logfile(self, body: Union[str, bytes], trace_id: str) -> Dict[str, Any]:
109 """

Calls 5

postMethod · 0.95
prepare_headersMethod · 0.95
ApiServerExceptionClass · 0.90
decodeMethod · 0.80
jsonMethod · 0.80