Skip to content

Commit

Permalink
memory management reworked & expanded
Browse files Browse the repository at this point in the history
- corrected async handling & management for memory slots
- removed data caching for the time-being (was redundant with the current architecture)
- updated testing suites
- housekeeping, formatting, etc.
  • Loading branch information
lane-neuro committed Sep 26, 2024
1 parent 3a549c7 commit 3721ad7
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def is_number(value):
return False

@command
def load_data(self, file_path, return_type='dict'):
def load_data(self, file_path, return_type='dict') :
data = None
data_type = os.path.splitext(file_path)[1][1:]
self._logger.info(f"Loading data from {file_path} as {data_type}")
Expand Down Expand Up @@ -232,10 +232,11 @@ def load_data(self, file_path, return_type='dict'):
elif return_type != 'dataframe':
self._logger.error(ValueError(f"Unsupported return type: {return_type}"), self.__class__.__name__)

return data
return data, data_type

except Exception as e:
self._logger.error(e, self.__class__.__name__)
raise e

@command
def save_data(self, file_path: str):
Expand Down
113 changes: 57 additions & 56 deletions research_analytics_suite/data_engine/memory/MemoryManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import asyncio
import os
import uuid

import aiosqlite
from typing import Type

from research_analytics_suite.commands import link_class_commands, command
from research_analytics_suite.data_engine.memory.MemorySlot import MemorySlot
Expand Down Expand Up @@ -72,6 +71,7 @@ def __init__(self, db_path: str = "memory_manager.db", cache_backend: str = 'cac
self._cache_directory = cache_directory

self._data_cache = None
self._slot_collection = {}

self._initialized = False

Expand All @@ -88,32 +88,41 @@ async def initialize(self):
self._logger = CustomLogger()
self._config = Config()

try:
self._db_path = os.path.normpath(self._db_path)
except Exception as e:
self._db_path = os.path.normpath(os.path.join(
self._config.BASE_DIR, self._config.WORKSPACE_NAME,
self._config.DATA_DIR, "memory_manager.db"))

try:
self._cache_directory = os.path.normpath(self._cache_directory)
except Exception as e:
self._cache_directory = os.path.normpath(
os.path.join(self._config.BASE_DIR, self._config.WORKSPACE_NAME,
self._config.DATA_DIR, self._config.CACHE_DIR))
await self._initialize_data_cache()

self._data_cache = DataCache(backend=self._cache_backend, directory=self._cache_directory)
await self._data_cache.initialize()
self._logger.debug("MemoryManager initialized.")
self._initialized = True

async def _initialize_data_cache(self):
"""
Initializes the data cache for the MemoryManager.
"""
try:
self._db_path = os.path.normpath(self._db_path)
except Exception as e:
self._db_path = os.path.normpath(os.path.join(
self._config.BASE_DIR, self._config.WORKSPACE_NAME,
self._config.DATA_DIR, "memory_manager.db"))

try:
self._cache_directory = os.path.normpath(self._cache_directory)
except Exception as e:
self._cache_directory = os.path.normpath(
os.path.join(self._config.BASE_DIR, self._config.WORKSPACE_NAME,
self._config.DATA_DIR, self._config.CACHE_DIR))

self._data_cache = DataCache(backend=self._cache_backend, directory=self._cache_directory)
await self._data_cache.initialize()

@command
async def create_slot(self, name: str, data: any, db_path: str = None, file_path: str = None) -> str:
async def create_slot(self, name: str, d_type: type, data: any = None, db_path: str = None, file_path: str = None) \
-> str:
"""
Creates a new memory slot and stores it in both cache and SQLite storage.
Args:
name (str): The name of the memory slot.
d_type (Type): The data type of the memory slot.
data (any): The data to store in the memory slot.
db_path (str): The path to the SQLite database file.
file_path (str): The file path for memory-mapped storage.
Expand All @@ -129,10 +138,15 @@ async def create_slot(self, name: str, data: any, db_path: str = None, file_path
file_path = None

_id = uuid.uuid4().hex[:8]
memory_slot = MemorySlot(memory_id=_id, name=name, data=data, db_path=db_path, file_path=file_path)
memory_slot = MemorySlot(
memory_id=_id, name=name, d_type=d_type, data=data, db_path=db_path, file_path=file_path)
self._slot_collection[memory_slot.memory_id] = memory_slot
await memory_slot.setup()
self._data_cache.set(key=memory_slot.memory_id, data=self.get_slot(memory_id=_id))
return memory_slot.memory_id

try:
self._data_cache.set(key=memory_slot.memory_id, data=lambda: self.get_slot(memory_id=_id))
finally:
return memory_slot.memory_id

@command
async def update_slot(self, memory_id: str, data: any) -> str:
Expand All @@ -150,10 +164,10 @@ async def update_slot(self, memory_id: str, data: any) -> str:
if memory_slot:
await memory_slot.set_data(data)
else:
memory_slot = MemorySlot(memory_id=memory_id, name="", data=data, db_path=self._db_path)
d_type = type(data)
memory_slot = MemorySlot(memory_id=memory_id, name="", d_type=d_type, data=data, db_path=self._db_path)
await memory_slot.setup()
await memory_slot.set_data(data)
self._data_cache.set(key=memory_id, data=self.get_slot(memory_id=memory_id))
return memory_id

@command
Expand All @@ -172,23 +186,16 @@ async def delete_slot(self, memory_id: str) -> None:
self._data_cache.delete(key=memory_id)

@command
async def list_slots(self) -> list:
def list_slots(self) -> list:
"""
Lists all memory slots stored in the SQLite storage.
Returns:
list: A dictionary of memory slots.
"""
memory_slots = []
for memory_slot in self._data_cache.cache_values():
if not isinstance(memory_slot, MemorySlot):
continue
print(memory_slot)
memory_slots.append(memory_slot)

return memory_slots
return list(self._slot_collection.values())

async def slot_data(self, memory_id: str) -> any:
def slot_data(self, memory_id: str) -> any:
"""
Retrieves the data stored in a memory slot.
Expand All @@ -198,18 +205,9 @@ async def slot_data(self, memory_id: str) -> any:
Returns:
any: The data stored in the memory slot.
"""
memory_slot = self._data_cache.get_key(key=memory_id)
if memory_slot is not None:
return await memory_slot.data
return self._slot_collection[memory_id].data

memory_slot = MemorySlot(memory_id=memory_id, name="", data=None, db_path=self._db_path)
await memory_slot.setup()
data = await memory_slot.data
if data:
self._data_cache.set(key=memory_id, data=self.get_slot(memory_id=memory_id))
return data

async def slot_name(self, memory_id: str) -> str:
def slot_name(self, memory_id: str) -> str:
"""
Retrieves the name of a memory slot.
Expand All @@ -219,16 +217,19 @@ async def slot_name(self, memory_id: str) -> str:
Returns:
str: The name of the memory slot.
"""
memory_slot = self._data_cache.get_key(key=memory_id)
if memory_slot:
return memory_slot.name
return self._slot_collection[memory_id].name

memory_slot = MemorySlot(memory_id=memory_id, name="", data=None, db_path=self._db_path)
await memory_slot.setup()
name = memory_slot.name
if name:
self._data_cache.set(key=memory_id, data=self.get_slot(memory_id=memory_id))
return name
def slot_type(self, memory_id: str) -> Type:
"""
Retrieves the data type of a memory slot.
Args:
memory_id (str): The unique identifier for the memory slot.
Returns:
Type: The data type of the memory slot.
"""
return self._slot_collection[memory_id].data_type

@command
def get_slot(self, memory_id: str) -> MemorySlot:
Expand All @@ -241,9 +242,9 @@ def get_slot(self, memory_id: str) -> MemorySlot:
Returns:
MemorySlot: The MemorySlot instance.
"""
return self._data_cache.get_key(key=memory_id)
return self._slot_collection[memory_id]

async def validate_slots(self, memory_ids: list, require_values: bool = True) -> (list, list):
def validate_slots(self, memory_ids: list, require_values: bool = True) -> (list, list):
"""
Validates a list of memory slot identifiers and returns the valid and invalid slots.
Expand All @@ -256,7 +257,7 @@ async def validate_slots(self, memory_ids: list, require_values: bool = True) ->
"""
valid_slots = []
for memory_id in memory_ids:
data = await self.slot_data(memory_id=memory_id)
data = self.slot_data(memory_id=memory_id)
if data:
if require_values and not data:
continue
Expand Down
105 changes: 80 additions & 25 deletions research_analytics_suite/data_engine/memory/MemorySlot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import sys
import time
from mmap import mmap
from typing import Type

import aiosqlite
from research_analytics_suite.utils import CustomLogger
from research_analytics_suite.operation_manager.operations.system.UpdateMonitor import UpdateMonitor


class MemorySlot:
Expand All @@ -42,35 +45,44 @@ class MemorySlot:
"""
DATA_SIZE_THRESHOLD = 2e6 # 2MB

def __init__(self, memory_id: str, name: str, data: any, db_path: str, file_path: str = None):
def __init__(self, memory_id: str, name: str, d_type: type, data: any, db_path: str, file_path: str = None):
"""
Initialize the MemorySlot instance.
Args:
memory_id (str): A unique identifier for the memory slot.
name (str): A name for the memory slot.
d_type (type): The data type for the memory slot.
data (any): The data stored in the memory slot.
db_path (str): The path to the SQLite database file.
file_path (str): The file path for memory-mapped storage.
"""
self._logger = CustomLogger()

from research_analytics_suite.operation_manager.control.OperationControl import OperationControl
self._operation_control = OperationControl()

self._memory_id = memory_id
self.name = name
self.data_type = d_type
self._metadata = {}
self._created_at = time.time()
self._modified_at = self._created_at
self._last_modified = None
self._lock = asyncio.Lock()

self.db_path = db_path
self._file_path = file_path
self._use_mmap = False
self._mmapped_file = None
self._file = None # Ensure _file is always initialized
self._file = None

self._check_data_size(data)
self._initial_data = data

self._data = None
self._update_operation = None

async def setup(self) -> None:
"""
Sets up the SQLite database and creates the variables table if it does not exist.
Expand All @@ -97,6 +109,15 @@ async def setup(self) -> None:
if not row and self._initial_data is not None:
await self.set_data(self._initial_data)

# Create an update operation to update the data stored in the memory slot
try:
self._update_operation = await self._operation_control.operation_manager.create_operation(
operation_type=UpdateMonitor, name=f"slot_{self.memory_id}", action=self._update_data)
self._update_operation.is_ready = True
except Exception as e:
self._logger.error(e, self.__class__.__name__)


@property
def name(self) -> str:
"""Gets the name of the memory slot.
Expand Down Expand Up @@ -134,29 +155,31 @@ def file_path(self) -> str:
return self._file_path or ""

@property
async def data(self) -> any:
"""Gets the data stored in the memory slot."""
async with self._lock:
if self._use_mmap:
if self._mmapped_file:
self._mmapped_file.seek(0)
serialized_data = self._mmapped_file.read()
return pickle.loads(serialized_data)
else:
try:
async with aiosqlite.connect(self.db_path) as conn:
cursor = await conn.execute(
"SELECT data FROM variables WHERE memory_id = ?",
(self._memory_id,)
)
row = await cursor.fetchone()
if row:
data = row[0]
return pickle.loads(data)
else:
return None
except Exception as e:
self._logger.error(Exception(f"[SQLite] Get variable error: {e}"), self.__class__.__name__)
def data_type(self) -> type:
"""Gets the data type for the memory slot.
Returns:
Type: The data type for the memory slot.
"""
return self._data_type

@data_type.setter
def data_type(self, value: type):
"""Sets the data type for the memory slot.
Args:
value (Type): The data type for the memory slot.
"""
self._data_type = value

@property
def data(self) -> any:
"""Gets the data stored in the memory slot.
Returns:
any: The data stored in the memory slot.
"""
return self._data

async def set_data(self, value: any) -> None:
"""Stores the data within SQLite or memory-mapped file."""
Expand All @@ -174,8 +197,40 @@ async def set_data(self, value: any) -> None:
await conn.commit()
except Exception as e:
self._logger.error(Exception(f"[SQLite] Add variable error: {e}"), self.__class__.__name__)

self._data = value
self._update_modified_time()

async def _update_data(self) -> None:
"""Updates the data stored in the memory slot."""
while not self._update_operation.is_running:
await asyncio.sleep(.1)

while self._update_operation.is_running:
await asyncio.sleep(.01)
if self._last_modified != self._modified_at:
async with self._lock:
if self._use_mmap:
if self._mmapped_file:
self._mmapped_file.seek(0)
serialized_data = self._mmapped_file.read()
self._data = pickle.loads(serialized_data)
self._last_modified = self._modified_at
else:
try:
async with aiosqlite.connect(self.db_path) as conn:
cursor = await conn.execute(
"SELECT data FROM variables WHERE memory_id = ?",
(self._memory_id,)
)
row = await cursor.fetchone()
if row:
data = row[0]
self._data = pickle.loads(data)
self._last_modified = self._modified_at
except Exception as e:
self._logger.error(Exception(f"[SQLite] Get variable error: {e}"), self.__class__.__name__)

def _check_data_size(self, _data) -> None:
"""
Checks the size of the data and initializes memory-mapped storage if necessary.
Expand Down
Loading

0 comments on commit 3721ad7

Please sign in to comment.