Upload an object to the V4 API. Args: body: The object body to upload Returns: Dictionary containing upload response data Raises: ApiServerException: If the upload fails
(self, body: Union[str, bytes])
| 69 | return HttpClient.get_session().post(url, json={"body": body}, headers=request_headers, timeout=30) |
| 70 | |
| 71 | def upload_object(self, body: Union[str, bytes]) -> Dict[str, Any]: |
| 72 | """ |
| 73 | Upload an object to the V4 API. |
| 74 | |
| 75 | Args: |
| 76 | body: The object body to upload |
| 77 | |
| 78 | Returns: |
| 79 | Dictionary containing upload response data |
| 80 | |
| 81 | Raises: |
| 82 | ApiServerException: If the upload fails |
| 83 | """ |
| 84 | try: |
| 85 | # Convert bytes to string for consistency with test expectations |
| 86 | if isinstance(body, bytes): |
| 87 | body = body.decode("utf-8") |
| 88 | |
| 89 | response = self.post("/v4/objects/upload/", body, self.prepare_headers()) |
| 90 | |
| 91 | if response.status_code != 200: |
| 92 | error_msg = f"Upload failed: {response.status_code}" |
| 93 | try: |
| 94 | error_data = response.json() |
| 95 | if "error" in error_data: |
| 96 | error_msg = error_data["error"] |
| 97 | except: |
| 98 | pass |
| 99 | raise ApiServerException(error_msg) |
| 100 | |
| 101 | try: |
| 102 | return response.json() |
| 103 | except Exception as e: |
| 104 | raise ApiServerException(f"Failed to process upload response: {str(e)}") |
| 105 | except requests.exceptions.RequestException as e: |
| 106 | raise ApiServerException(f"Failed to upload object: {e}") |
| 107 | |
| 108 | def upload_logfile(self, body: Union[str, bytes], trace_id: str) -> Dict[str, Any]: |
| 109 | """ |