From c7a797076b401181135271e47a9cb5340b42ad13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Mon, 22 Sep 2025 17:36:10 +0800 Subject: [PATCH 1/5] feat: add default processing in mem-reader --- src/memos/mem_reader/simple_struct.py | 178 ++++++++++++++++---------- 1 file changed, 108 insertions(+), 70 deletions(-) diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index d64e0a02b..693e4d5de 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -56,44 +56,60 @@ def detect_lang(text): def _build_node(idx, message, info, scene_file, llm, parse_json_result, embedder): # generate - raw = llm.generate(message) - if not raw: + try: + raw = llm.generate(message) + if not raw: + logger.warning(f"[LLM] Empty generation for input: {message}") + return None + except Exception as e: + logger.error(f"[LLM] Exception during generation: {e}") return None # parse_json_result - chunk_res = parse_json_result(raw) - if not chunk_res: + try: + chunk_res = parse_json_result(raw) + if not chunk_res: + logger.warning(f"[Parse] Failed to parse result: {raw}") + return None + except Exception as e: + logger.error(f"[Parse] Exception during JSON parsing: {e}") return None - value = chunk_res.get("value") - if not value: + try: + value = chunk_res.get("value", "").strip() + if not value: + logger.warning("[BuildNode] value is empty") + return None + + tags = chunk_res.get("tags", []) + if not isinstance(tags, list): + tags = [] + + key = chunk_res.get("key", None) + + embedding = embedder.embed([value])[0] + + return TextualMemoryItem( + memory=value, + metadata=TreeNodeTextualMemoryMetadata( + user_id=info.get("user_id", ""), + session_id=info.get("session_id", ""), + memory_type="LongTermMemory", + status="activated", + tags=tags, + key=key, + embedding=embedding, + usage=[], + sources=[{"type": "doc", "doc_path": f"{scene_file}_{idx}"}], + background="", + confidence=0.99, + type="fact", + ), + ) + except Exception as e: + logger.error(f"[BuildNode] Error building node: {e}") return None - # embed - embedding = embedder.embed([value])[0] - - # TextualMemoryItem - tags = chunk_res["tags"] if isinstance(chunk_res.get("tags"), list) else [] - key = chunk_res.get("key", None) - node_i = TextualMemoryItem( - memory=value, - metadata=TreeNodeTextualMemoryMetadata( - user_id=info.get("user_id"), - session_id=info.get("session_id"), - memory_type="LongTermMemory", - status="activated", - tags=tags, - key=key, - embedding=embedding, - usage=[], - sources=[{"type": "doc", "doc_path": f"{scene_file}_{idx}"}], - background="", - confidence=0.99, - type="fact", - ), - ) - return node_i - class SimpleStructMemReader(BaseMemReader, ABC): """Naive implementation of MemReader.""" @@ -129,40 +145,57 @@ def _process_chat_data(self, scene_data_info, info): messages = [{"role": "user", "content": prompt}] - response_text = self.llm.generate(messages) - response_json = self.parse_json_result(response_text) + try: + response_text = self.llm.generate(messages) + response_json = self.parse_json_result(response_text) + except Exception as e: + logger.error(f"[LLM] Exception during chat generation: {e}") + response_json = { + "memory list": [ + { + "key": "\n".join(mem_list)[:10], + "memory_type": "UserMemory", + "value": "\n".join(mem_list), + "tags": [], + } + ], + "summary": "\n".join(mem_list), + } chat_read_nodes = [] for memory_i_raw in response_json.get("memory list", []): - memory_type = ( - memory_i_raw.get("memory_type", "LongTermMemory") - .replace("长期记忆", "LongTermMemory") - .replace("用户记忆", "UserMemory") - ) - - if memory_type not in ["LongTermMemory", "UserMemory"]: - memory_type = "LongTermMemory" - - node_i = TextualMemoryItem( - memory=memory_i_raw.get("value", ""), - metadata=TreeNodeTextualMemoryMetadata( - user_id=info.get("user_id"), - session_id=info.get("session_id"), - memory_type=memory_type, - status="activated", - tags=memory_i_raw.get("tags", []) - if type(memory_i_raw.get("tags", [])) is list - else [], - key=memory_i_raw.get("key", ""), - embedding=self.embedder.embed([memory_i_raw.get("value", "")])[0], - usage=[], - sources=scene_data_info, - background=response_json.get("summary", ""), - confidence=0.99, - type="fact", - ), - ) - chat_read_nodes.append(node_i) + try: + memory_type = ( + memory_i_raw.get("memory_type", "LongTermMemory") + .replace("长期记忆", "LongTermMemory") + .replace("用户记忆", "UserMemory") + ) + + if memory_type not in ["LongTermMemory", "UserMemory"]: + memory_type = "LongTermMemory" + + node_i = TextualMemoryItem( + memory=memory_i_raw.get("value", ""), + metadata=TreeNodeTextualMemoryMetadata( + user_id=info.get("user_id"), + session_id=info.get("session_id"), + memory_type=memory_type, + status="activated", + tags=memory_i_raw.get("tags", []) + if type(memory_i_raw.get("tags", [])) is list + else [], + key=memory_i_raw.get("key", ""), + embedding=self.embedder.embed([memory_i_raw.get("value", "")])[0], + usage=[], + sources=scene_data_info, + background=response_json.get("summary", ""), + confidence=0.99, + type="fact", + ), + ) + chat_read_nodes.append(node_i) + except Exception as e: + logger.error(f"[ChatReader] Error parsing memory item: {e}") return chat_read_nodes @@ -267,8 +300,12 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]: for item in scene_data: try: if os.path.exists(item): - parsed_text = parser.parse(item) - results.append({"file": item, "text": parsed_text}) + try: + parsed_text = parser.parse(item) + results.append({"file": item, "text": parsed_text}) + except Exception as e: + logger.error(f"[SceneParser] Error parsing {item}: {e}") + continue else: parsed_text = item results.append({"file": "pure_text", "text": parsed_text}) @@ -315,6 +352,7 @@ def _process_doc_data(self, scene_data_info, info, **kwargs): doc_nodes.append(node) except Exception as e: tqdm.write(f"[ERROR] {e}") + logger.error(f"[DocReader] Future task failed: {e}") return doc_nodes def parse_json_result(self, response_text): @@ -322,14 +360,14 @@ def parse_json_result(self, response_text): json_start = response_text.find("{") response_text = response_text[json_start:] response_text = response_text.replace("```", "").strip() - if response_text[-1] != "}": + if not response_text.endswith("}"): response_text += "}" - response_json = json.loads(response_text) - return response_json + return json.loads(response_text) except json.JSONDecodeError as e: - logger.warning( - f"Failed to parse LLM response as JSON: {e}\nRaw response:\n{response_text}" - ) + logger.error(f"[JSONParse] Failed to decode JSON: {e}\nRaw:\n{response_text}") + return {} + except Exception as e: + logger.error(f"[JSONParse] Unexpected error: {e}") return {} def transform_memreader(self, data: dict) -> list[TextualMemoryItem]: From 9a30174a2115ca215feec884d985e1e7b409a28a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 23 Sep 2025 16:54:32 +0800 Subject: [PATCH 2/5] feat: not include embedding --- .../memories/textual/tree_text_memory/retrieve/recall.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py index a98773014..55bf673be 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/recall.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/recall.py @@ -53,7 +53,7 @@ def retrieve( if memory_scope == "WorkingMemory": # For working memory, retrieve all entries (no filtering) working_memories = self.graph_store.get_all_memory_items( - scope="WorkingMemory", include_embedding=True + scope="WorkingMemory", include_embedding=False ) return [TextualMemoryItem.from_dict(record) for record in working_memories] @@ -165,7 +165,7 @@ def _graph_recall( return [] # Load nodes and post-filter - node_dicts = self.graph_store.get_nodes(list(candidate_ids), include_embedding=True) + node_dicts = self.graph_store.get_nodes(list(candidate_ids), include_embedding=False) final_nodes = [] for node in node_dicts: @@ -240,7 +240,7 @@ def search_single(vec, filt=None): unique_ids = {r["id"] for r in all_hits if r.get("id")} node_dicts = ( self.graph_store.get_nodes( - list(unique_ids), include_embedding=True, cube_name=cube_name + list(unique_ids), include_embedding=False, cube_name=cube_name ) or [] ) From 8bc9a94ceae24396c78ed57da51406a53ff74d42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Tue, 23 Sep 2025 20:58:39 +0800 Subject: [PATCH 3/5] feat: add logger to detect remove-old-memory error --- .../tree_text_memory/organize/manager.py | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/memos/memories/textual/tree_text_memory/organize/manager.py b/src/memos/memories/textual/tree_text_memory/organize/manager.py index 9e4890364..c9cd4de8a 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/manager.py +++ b/src/memos/memories/textual/tree_text_memory/organize/manager.py @@ -1,3 +1,4 @@ +import traceback import uuid from concurrent.futures import as_completed @@ -65,15 +66,26 @@ def add(self, memories: list[TextualMemoryItem]) -> list[str]: except Exception as e: logger.exception("Memory processing error: ", exc_info=e) - self.graph_store.remove_oldest_memory( - memory_type="WorkingMemory", keep_latest=self.memory_size["WorkingMemory"] - ) - self.graph_store.remove_oldest_memory( - memory_type="LongTermMemory", keep_latest=self.memory_size["LongTermMemory"] - ) - self.graph_store.remove_oldest_memory( - memory_type="UserMemory", keep_latest=self.memory_size["UserMemory"] - ) + try: + self.graph_store.remove_oldest_memory( + memory_type="WorkingMemory", keep_latest=self.memory_size["WorkingMemory"] + ) + except Exception: + logger.warning(f"Remove WorkingMemory error: {traceback.format_exc()}") + + try: + self.graph_store.remove_oldest_memory( + memory_type="LongTermMemory", keep_latest=self.memory_size["LongTermMemory"] + ) + except Exception: + logger.warning(f"Remove LongTermMemory error: {traceback.format_exc()}") + + try: + self.graph_store.remove_oldest_memory( + memory_type="UserMemory", keep_latest=self.memory_size["UserMemory"] + ) + except Exception: + logger.warning(f"Remove UserMemory error: {traceback.format_exc()}") self._refresh_memory_size() return added_ids From c101108039e336a8daebdd1d086cd5a1fcb26bbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Wed, 24 Sep 2025 19:24:59 +0800 Subject: [PATCH 4/5] feat: add openai/qwen llm log --- src/memos/llms/openai.py | 3 +++ src/memos/llms/qwen.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index 698bc3265..47818a948 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -11,6 +11,7 @@ from memos.llms.utils import remove_thinking_tags from memos.log import get_logger from memos.types import MessageList +from memos.utils import timed logger = get_logger(__name__) @@ -56,6 +57,7 @@ def clear_cache(cls): cls._instances.clear() logger.info("OpenAI LLM instance cache cleared") + @timed def generate(self, messages: MessageList) -> str: """Generate a response from OpenAI LLM.""" response = self.client.chat.completions.create( @@ -73,6 +75,7 @@ def generate(self, messages: MessageList) -> str: else: return response_content + @timed def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from OpenAI LLM with optional reasoning support.""" response = self.client.chat.completions.create( diff --git a/src/memos/llms/qwen.py b/src/memos/llms/qwen.py index a47fcdf36..b2d662ffd 100644 --- a/src/memos/llms/qwen.py +++ b/src/memos/llms/qwen.py @@ -5,6 +5,7 @@ from memos.llms.utils import remove_thinking_tags from memos.log import get_logger from memos.types import MessageList +from memos.utils import timed logger = get_logger(__name__) @@ -16,6 +17,7 @@ class QwenLLM(OpenAILLM): def __init__(self, config: QwenLLMConfig): super().__init__(config) + @timed def generate(self, messages: MessageList) -> str: """Generate a response from Qwen LLM.""" response = self.client.chat.completions.create( @@ -33,6 +35,7 @@ def generate(self, messages: MessageList) -> str: else: return response_content + @timed def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from Qwen LLM.""" response = self.client.chat.completions.create( From 4bb4b5c51e678f2e6fd9a9fddc3a79d6cc152b42 Mon Sep 17 00:00:00 2001 From: chunyu li <78344051+fridayL@users.noreply.github.com> Date: Wed, 24 Sep 2025 20:36:58 +0800 Subject: [PATCH 5/5] add: change deafult pre_load (#338) * add: change deafult pre_load * fix: code --------- Co-authored-by: CaralHsi --- src/memos/api/product_api.py | 2 +- src/memos/mem_os/product.py | 4 ++-- src/memos/mem_user/mysql_persistent_user_manager.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/memos/api/product_api.py b/src/memos/api/product_api.py index 681644a0d..709ad74fb 100644 --- a/src/memos/api/product_api.py +++ b/src/memos/api/product_api.py @@ -33,6 +33,6 @@ parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=8001) - parser.add_argument("--workers", type=int, default=32) + parser.add_argument("--workers", type=int, default=1) args = parser.parse_args() uvicorn.run("memos.api.product_api:app", host="0.0.0.0", port=args.port, workers=args.workers) diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index a4ab4ef20..d64643897 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -179,14 +179,14 @@ def _restore_user_instances( """ try: # Get all user configurations from persistent storage - user_configs = self.user_manager.list_user_configs() + user_configs = self.user_manager.list_user_configs(self.max_user_instances) # Get the raw database records for sorting by updated_at session = self.user_manager._get_session() try: from memos.mem_user.persistent_user_manager import UserConfig - db_configs = session.query(UserConfig).all() + db_configs = session.query(UserConfig).limit(self.max_user_instances).all() # Create a mapping of user_id to updated_at timestamp updated_at_map = {config.user_id: config.updated_at for config in db_configs} diff --git a/src/memos/mem_user/mysql_persistent_user_manager.py b/src/memos/mem_user/mysql_persistent_user_manager.py index f8983c87c..99e49d206 100644 --- a/src/memos/mem_user/mysql_persistent_user_manager.py +++ b/src/memos/mem_user/mysql_persistent_user_manager.py @@ -188,7 +188,7 @@ def delete_user_config(self, user_id: str) -> bool: finally: session.close() - def list_user_configs(self) -> dict[str, MOSConfig]: + def list_user_configs(self, limit: int = 1) -> dict[str, MOSConfig]: """List all user configurations. Returns: @@ -196,7 +196,7 @@ def list_user_configs(self) -> dict[str, MOSConfig]: """ session = self._get_session() try: - user_configs = session.query(UserConfig).all() + user_configs = session.query(UserConfig).limit(limit).all() result = {} for user_config in user_configs: