Update the memory store with the latest conversation.
(self, node: Node, input_data: str, inputs: List[Message], result: Message | str)
| 1251 | ) |
| 1252 | |
| 1253 | def _update_memory(self, node: Node, input_data: str, inputs: List[Message], result: Message | str) -> None: |
| 1254 | """Update the memory store with the latest conversation.""" |
| 1255 | memory_manager = self.context.get_memory_manager(node.id) |
| 1256 | if not memory_manager: |
| 1257 | return |
| 1258 | |
| 1259 | stage = AgentExecFlowStage.FINISHED_STAGE |
| 1260 | |
| 1261 | input_snapshot = MemoryContentSnapshot.from_messages(inputs) |
| 1262 | output_snapshot = MemoryContentSnapshot.from_message(result) |
| 1263 | payload = MemoryWritePayload( |
| 1264 | agent_role=node.role if node.role else "", |
| 1265 | inputs_text=input_data, |
| 1266 | input_snapshot=input_snapshot, |
| 1267 | output_snapshot=output_snapshot, |
| 1268 | ) |
| 1269 | |
| 1270 | with self.log_manager.memory_timer(node.id, "UPDATE", stage.value): |
| 1271 | memory_manager.update(payload) |
| 1272 | |
| 1273 | # Record the memory update |
| 1274 | normalized_result = result.text_content() if isinstance(result, Message) else str(result) |
| 1275 | self.log_manager.record_memory_operation( |
| 1276 | node.id, |
| 1277 | "UPDATE", |
| 1278 | stage.value, |
| 1279 | normalized_result, |
| 1280 | { |
| 1281 | "stage": stage.value, |
| 1282 | "input_size": len(str(input_data)), |
| 1283 | "output_size": len(normalized_result), |
| 1284 | "attachment_count": len(output_snapshot.attachment_overview()) if output_snapshot else 0, |
| 1285 | } |
| 1286 | ) |
| 1287 | |
| 1288 | def _build_thinking_payload_from_inputs(self, inputs: List[Message], input_text: str) -> ThinkingPayload: |
| 1289 | blocks: List[MessageBlock] = [] |
no test coverage detected