Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 15 additions & 138 deletions src/memos/mem_scheduler/base_mixins/memory_ops.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

from memos.log import get_logger
from memos.mem_scheduler.schemas.monitor_schemas import MemoryMonitorItem
from memos.mem_scheduler.utils.db_utils import get_utc_now
from memos.mem_scheduler.utils.filter_utils import transform_name_to_key
from memos.memories.activation.kv import KVCacheMemory
from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory
from memos.memories.textual.naive import NaiveTextMemory
from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory
from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE


if TYPE_CHECKING:
Expand Down Expand Up @@ -201,71 +196,16 @@ def update_activation_memory(
mem_cube_id: MemCubeID | str,
mem_cube,
) -> None:
if len(new_memories) == 0:
logger.error("update_activation_memory: new_memory is empty.")
return
if isinstance(new_memories[0], TextualMemoryItem):
new_text_memories = [mem.memory for mem in new_memories]
elif isinstance(new_memories[0], str):
new_text_memories = new_memories
else:
logger.error("Not Implemented.")
return

try:
if isinstance(mem_cube.act_mem, VLLMKVCacheMemory):
act_mem: VLLMKVCacheMemory = mem_cube.act_mem
elif isinstance(mem_cube.act_mem, KVCacheMemory):
act_mem = mem_cube.act_mem
else:
logger.error("Not Implemented.")
return

new_text_memory = MEMORY_ASSEMBLY_TEMPLATE.format(
memory_text="".join(
[
f"{i + 1}. {sentence.strip()}\n"
for i, sentence in enumerate(new_text_memories)
if sentence.strip()
]
)
)

original_cache_items: list[VLLMKVCacheItem] = act_mem.get_all()
original_text_memories = []
if len(original_cache_items) > 0:
pre_cache_item: VLLMKVCacheItem = original_cache_items[-1]
original_text_memories = pre_cache_item.records.text_memories
original_composed_text_memory = pre_cache_item.records.composed_text_memory
if original_composed_text_memory == new_text_memory:
logger.warning(
"Skipping memory update - new composition matches existing cache: %s",
new_text_memory[:50] + "..."
if len(new_text_memory) > 50
else new_text_memory,
)
return
act_mem.delete_all()

cache_item = act_mem.extract(new_text_memory)
cache_item.records.text_memories = new_text_memories
cache_item.records.timestamp = get_utc_now()

act_mem.add([cache_item])
act_mem.dump(self.act_mem_dump_path)

self.log_activation_memory_update(
original_text_memories=original_text_memories,
new_text_memories=new_text_memories,
if hasattr(self, "activation_memory_manager") and self.activation_memory_manager:
self.activation_memory_manager.update_activation_memory(
new_memories=new_memories,
label=label,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
log_func_callback=self._submit_web_logs,
)

except Exception as e:
logger.error("MOS-based activation memory update failed: %s", e, exc_info=True)
else:
logger.warning("Activation memory manager not initialized")

def update_activation_memory_periodically(
self,
Expand All @@ -275,76 +215,13 @@ def update_activation_memory_periodically(
mem_cube_id: MemCubeID | str,
mem_cube,
):
try:
if (
self.monitor.last_activation_mem_update_time == datetime.min
or self.monitor.timed_trigger(
last_time=self.monitor.last_activation_mem_update_time,
interval_seconds=interval_seconds,
)
):
logger.info(
"Updating activation memory for user %s and mem_cube %s",
user_id,
mem_cube_id,
)

if (
user_id not in self.monitor.working_memory_monitors
or mem_cube_id not in self.monitor.working_memory_monitors[user_id]
or len(self.monitor.working_memory_monitors[user_id][mem_cube_id].obj.memories)
== 0
):
logger.warning(
"No memories found in working_memory_monitors, activation memory update is skipped"
)
return

self.monitor.update_activation_memory_monitors(
user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube
)

activation_db_manager = self.monitor.activation_memory_monitors[user_id][
mem_cube_id
]
activation_db_manager.sync_with_orm()
new_activation_memories = [
m.memory_text for m in activation_db_manager.obj.memories
]

logger.info(
"Collected %s new memory entries for processing",
len(new_activation_memories),
)
for i, memory in enumerate(new_activation_memories[:5], 1):
logger.info(
"Part of New Activation Memories | %s/%s: %s",
i,
len(new_activation_memories),
memory[:20],
)

self.update_activation_memory(
new_memories=new_activation_memories,
label=label,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
)

self.monitor.last_activation_mem_update_time = get_utc_now()

logger.debug(
"Activation memory update completed at %s",
self.monitor.last_activation_mem_update_time,
)

else:
logger.info(
"Skipping update - %s second interval not yet reached. Last update time is %s and now is %s",
interval_seconds,
self.monitor.last_activation_mem_update_time,
get_utc_now(),
)
except Exception as e:
logger.error("Error in update_activation_memory_periodically: %s", e, exc_info=True)
if hasattr(self, "activation_memory_manager") and self.activation_memory_manager:
self.activation_memory_manager.update_activation_memory_periodically(
interval_seconds=interval_seconds,
label=label,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
)
else:
logger.warning("Activation memory manager not initialized")
9 changes: 8 additions & 1 deletion src/memos/mem_scheduler/base_mixins/queue_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,14 @@ def handlers(self) -> dict[str, Callable]:
return self.dispatcher.handlers

def register_handlers(
self, handlers: dict[str, Callable[[list[ScheduleMessageItem]], None]]
self,
handlers: dict[
str,
Callable[[list[ScheduleMessageItem]], None]
| tuple[
Callable[[list[ScheduleMessageItem]], None], TaskPriorityLevel | None, int | None
],
],
) -> None:
if not self.dispatcher:
logger.warning("Dispatcher is not initialized, cannot register handlers")
Expand Down
68 changes: 68 additions & 0 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
from memos.mem_scheduler.general_modules.init_components_for_scheduler import init_components
from memos.mem_scheduler.general_modules.misc import AutoDroppingQueue as Queue
from memos.mem_scheduler.general_modules.scheduler_logger import SchedulerLoggerModule
from memos.mem_scheduler.memory_manage_modules.activation_memory_manager import (
ActivationMemoryManager,
)
from memos.mem_scheduler.memory_manage_modules.post_processor import MemoryPostProcessor
from memos.mem_scheduler.memory_manage_modules.retriever import SchedulerRetriever
from memos.mem_scheduler.memory_manage_modules.search_service import SchedulerSearchService
from memos.mem_scheduler.monitors.dispatcher_monitor import SchedulerDispatcherMonitor
from memos.mem_scheduler.monitors.general_monitor import SchedulerGeneralMonitor
from memos.mem_scheduler.monitors.task_schedule_monitor import TaskScheduleMonitor
Expand Down Expand Up @@ -51,6 +56,7 @@
from memos.mem_cube.base import BaseMemCube
from memos.mem_feedback.simple_feedback import SimpleMemFeedback
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem
from memos.memories.textual.item import TextualMemoryItem
from memos.memories.textual.tree import TreeTextMemory
from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher
from memos.reranker.http_bge import HTTPBGEReranker
Expand Down Expand Up @@ -118,6 +124,9 @@ def __init__(self, config: BaseSchedulerConfig):
self.orchestrator = SchedulerOrchestrator()

self.searcher: Searcher | None = None
self.search_service: SchedulerSearchService | None = None
self.post_processor: MemoryPostProcessor | None = None
self.activation_memory_manager: ActivationMemoryManager | None = None
self.retriever: SchedulerRetriever | None = None
self.db_engine: Engine | None = None
self.monitor: SchedulerGeneralMonitor | None = None
Expand Down Expand Up @@ -187,6 +196,9 @@ def init_mem_cube(
self.searcher = searcher
self.feedback_server = feedback_server

# Initialize search service with the searcher
self.search_service = SchedulerSearchService(searcher=self.searcher)

def initialize_modules(
self,
chat_llm: BaseLLM,
Expand Down Expand Up @@ -217,6 +229,20 @@ def initialize_modules(
self.dispatcher_monitor = SchedulerDispatcherMonitor(config=self.config)
self.retriever = SchedulerRetriever(process_llm=self.process_llm, config=self.config)



# Initialize post-processor for memory enhancement and filtering
self.post_processor = MemoryPostProcessor(
process_llm=self.process_llm, config=self.config
)

self.activation_memory_manager = ActivationMemoryManager(
act_mem_dump_path=self.act_mem_dump_path,
monitor=self.monitor,
log_func_callback=self._submit_web_logs,
log_activation_memory_update_func=self.log_activation_memory_update,
)

if mem_reader:
self.mem_reader = mem_reader

Expand Down Expand Up @@ -366,3 +392,45 @@ def mem_cubes(self, value: dict[str, BaseMemCube]) -> None:
)

# Methods moved to mixins in mem_scheduler.base_mixins.

def update_activation_memory(
self,
new_memories: list[str | TextualMemoryItem],
label: str,
user_id: UserID | str,
mem_cube_id: MemCubeID | str,
mem_cube: BaseMemCube,
) -> None:
"""
Update activation memory by extracting KVCacheItems from new_memory (list of str),
add them to a KVCacheMemory instance, and dump to disk.
"""
if self.activation_memory_manager:
self.activation_memory_manager.update_activation_memory(
new_memories=new_memories,
label=label,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
)
else:
logger.warning("Activation memory manager not initialized")

def update_activation_memory_periodically(
self,
interval_seconds: int,
label: str,
user_id: UserID | str,
mem_cube_id: MemCubeID | str,
mem_cube: BaseMemCube,
):
if self.activation_memory_manager:
self.activation_memory_manager.update_activation_memory_periodically(
interval_seconds=interval_seconds,
label=label,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
)
else:
logger.warning("Activation memory manager not initialized")
6 changes: 3 additions & 3 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from memos.configs.mem_scheduler import GeneralSchedulerConfig
from memos.mem_scheduler.base_scheduler import BaseScheduler
from memos.mem_scheduler.handlers import (
from memos.mem_scheduler.task_schedule_modules.handlers import (
SchedulerHandlerContext,
SchedulerHandlerRegistry,
SchedulerHandlerServices,
Expand All @@ -31,7 +31,7 @@ def __init__(self, config: GeneralSchedulerConfig):
transform_working_memories_to_monitors=self.transform_working_memories_to_monitors,
log_working_memory_replacement=self.log_working_memory_replacement,
)
ctx = SchedulerHandlerContext(
scheduler_context = SchedulerHandlerContext(
get_mem_cube=lambda: self.mem_cube,
get_monitor=lambda: self.monitor,
get_retriever=lambda: self.retriever,
Expand All @@ -44,5 +44,5 @@ def __init__(self, config: GeneralSchedulerConfig):
services=services,
)

self._handler_registry = SchedulerHandlerRegistry(ctx)
self._handler_registry = SchedulerHandlerRegistry(scheduler_context)
self.register_handlers(self._handler_registry.build_dispatch_map())
9 changes: 0 additions & 9 deletions src/memos/mem_scheduler/handlers/__init__.py

This file was deleted.

56 changes: 0 additions & 56 deletions src/memos/mem_scheduler/handlers/answer_handler.py

This file was deleted.

Loading