Encode a request message. Args: request_id: Unique request identifier (uint64) app_path: Import path of the dirty app action: Action to call on the app args: Positional arguments kwargs: Keyword arguments Returns:
(request_id: int, app_path: str, action: str,
args: tuple = None, kwargs: dict = None)
| 178 | |
| 179 | @staticmethod |
| 180 | def encode_request(request_id: int, app_path: str, action: str, |
| 181 | args: tuple = None, kwargs: dict = None) -> bytes: |
| 182 | """ |
| 183 | Encode a request message. |
| 184 | |
| 185 | Args: |
| 186 | request_id: Unique request identifier (uint64) |
| 187 | app_path: Import path of the dirty app |
| 188 | action: Action to call on the app |
| 189 | args: Positional arguments |
| 190 | kwargs: Keyword arguments |
| 191 | |
| 192 | Returns: |
| 193 | bytes: Complete message (header + payload) |
| 194 | """ |
| 195 | payload_dict = { |
| 196 | "app_path": app_path, |
| 197 | "action": action, |
| 198 | "args": list(args) if args else [], |
| 199 | "kwargs": kwargs or {}, |
| 200 | } |
| 201 | payload = TLVEncoder.encode(payload_dict) |
| 202 | header = BinaryProtocol.encode_header(MSG_TYPE_REQUEST, request_id, |
| 203 | len(payload)) |
| 204 | return header + payload |
| 205 | |
| 206 | @staticmethod |
| 207 | def encode_response(request_id: int, result) -> bytes: |