Skip to content

Commit

Permalink
split TaskManager into TaskCreator & TaskMonitor, new OperationLifecy…
Browse files Browse the repository at this point in the history
…cleManager
  • Loading branch information
lane-neuro committed Jun 3, 2024
1 parent a8a420a commit 1dafc7e
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 230 deletions.
14 changes: 9 additions & 5 deletions neurobehavioral_analytics_suite/gui/OperationManagerDialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class OperationManagerDialog:

SLEEP_DURATION = 0.05
TILE_WIDTH = 300 # Fixed width for each operation tile
TILE_HEIGHT = 200 # Fixed height for each operation tile
TILE_HEIGHT = 300 # Fixed height for each operation tile

def __init__(self, operation_control: OperationControl, logger, tiles_per_row=3):
"""Initializes the OperationManagerDialog with the given operation control, logger, and tile settings.
Expand All @@ -20,7 +20,8 @@ def __init__(self, operation_control: OperationControl, logger, tiles_per_row=3)
logger: Logger instance for logging messages.
tiles_per_row: Number of tiles to display per row.
"""
self.window = dpg.add_group(label="Operation Manager", parent="operation_pane", tag="operation_gallery", horizontal=False)
self.window = dpg.add_group(label="Operation Manager", parent="operation_pane", tag="operation_gallery",
horizontal=False)
self.operation_control = operation_control
self.logger = logger
self.operation_items = {} # Store operation GUI items
Expand Down Expand Up @@ -56,7 +57,8 @@ async def display_operations(self):
for operation_chain in queue_copy:
for node in operation_chain:
if node.operation not in self.operation_items and node.operation.name != "gui_OperationUpdateTask":
self.operation_items[node.operation] = OperationModule(node.operation, self.operation_control, self.logger)
self.operation_items[node.operation] = OperationModule(node.operation, self.operation_control,
self.logger)
await self.operation_items[node.operation].initialize()
self.add_operation_tile(node.operation)
await asyncio.sleep(self.SLEEP_DURATION)
Expand All @@ -68,10 +70,12 @@ def add_operation_tile(self, operation):
operation: The operation to add as a tile.
"""
# Ensure current row group is created
if self.current_row_group is None or len(dpg.get_item_children(self.current_row_group)[1]) >= self.tiles_per_row:
if (self.current_row_group is None
or len(dpg.get_item_children(self.current_row_group)[1]) >= self.tiles_per_row):
self.current_row_group = dpg.add_group(horizontal=True, parent=self.window)
self.logger.debug(f"Created new row group: {self.current_row_group}")
child_window = dpg.add_child_window(width=self.TILE_WIDTH, height=self.TILE_HEIGHT, parent=self.current_row_group)
child_window = dpg.add_child_window(width=self.TILE_WIDTH, height=self.TILE_HEIGHT,
parent=self.current_row_group)
self.logger.debug(f"Created child window: {child_window} in row group: {self.current_row_group}")
self.operation_items[operation].draw(parent=child_window)

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class OperationModule:
"""A class to manage operations and their GUI representation."""

def __init__(self, operation: Operation, operation_control, logger):
def __init__(self, operation, operation_control, logger):
"""Initializes the OperationModule with the given operation, control, and logger.
Args:
Expand Down
122 changes: 23 additions & 99 deletions neurobehavioral_analytics_suite/operation_manager/OperationControl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""
A module that defines the OperationControl class, which is responsible for managing and executing operations in the
queue. It also handles user input from the console and monitors system resources.
OperationControl Module.
The OperationControl class provides methods for adding operations to the queue, stopping, pausing, and resuming
operations, and getting the status of operations. It also sets up the asyncio event loop and continuously monitors
for tasks.
This module defines the OperationControl class, which is responsible for managing and executing operations in the
queue. It integrates various components like the operation queue, manager, operation_executor, and checker to manage the
lifecycle of operations.
Author: Lane
Copyright: Lane
Expand All @@ -17,139 +16,64 @@
"""

import asyncio
from neurobehavioral_analytics_suite.operation_manager.OperationChain import OperationChain
from neurobehavioral_analytics_suite.operation_manager.OperationExecutor import OperationExecutor
from neurobehavioral_analytics_suite.operation_manager.OperationManager import OperationManager
from neurobehavioral_analytics_suite.operation_manager.OperationStatusChecker import OperationStatusChecker
from neurobehavioral_analytics_suite.operation_manager.PersistentOperationChecker import PersistentOperationChecker
from neurobehavioral_analytics_suite.operation_manager.TaskManager import TaskManager
from neurobehavioral_analytics_suite.utils.ErrorHandler import ErrorHandler
from neurobehavioral_analytics_suite.operation_manager.OperationQueue import OperationQueue
from neurobehavioral_analytics_suite.utils.UserInputManager import UserInputManager
from neurobehavioral_analytics_suite.operation_manager.OperationLifecycleManager import OperationLifecycleManager
from neurobehavioral_analytics_suite.operation_manager.TaskCreator import TaskCreator
from neurobehavioral_analytics_suite.operation_manager.TaskMonitor import TaskMonitor


class OperationControl:
"""
A class for handling the lifecycle of Operation instances.
This class provides methods for starting, executing, pausing, stopping, and resuming operations. It uses an
instance of OperationQueue to manage the queue of operations.
Attributes:
queue (OperationQueue): A queue for storing Operation instances.
error_handler (ErrorHandler): An instance of ErrorHandler to handle any exceptions that occur.
main_loop (asyncio.AbstractEventLoop): The main asyncio event loop.
"""
"""A class for handling the lifecycle of Operation instances."""

def __init__(self, logger, sleep_time: float = 0.15):
"""
Initializes the OperationControl with an OperationQueue and an ErrorHandler instance.
Initializes the OperationControl with various components.
"""
self.logger = logger
self.error_handler = ErrorHandler()
self.main_loop = asyncio.get_event_loop()

self.queue = OperationQueue(logger=self.logger, error_handler=self.error_handler)
self.task_manager = TaskManager(operation_control=self, logger=self.logger, error_handler=self.error_handler,
queue=self.queue)
self.task_creator = TaskCreator(logger=self.logger, queue=self.queue)
self.task_monitor = TaskMonitor(task_creator=self.task_creator, queue=self.queue, logger=self.logger,
error_handler=self.error_handler)

self.console_operation_in_progress = False
self.local_vars = locals()

self.sleep_time = sleep_time

self.operation_manager = OperationManager(operation_control=self, queue=self.queue,
task_manager=self.task_manager, logger=self.logger,
task_creator=self.task_creator, logger=self.logger,
error_handler=self.error_handler)
self.operation_executor = OperationExecutor(operation_control=self, queue=self.queue,
task_manager=self.task_manager, logger=self.logger,
task_creator=self.task_creator, logger=self.logger,
error_handler=self.error_handler)
self.operation_status_checker = OperationStatusChecker(operation_control=self, queue=self.queue)
self.persistent_operation_checker = PersistentOperationChecker(operation_control=self,
operation_manager=self.operation_manager,
queue=self.queue,
task_manager=self.task_manager,
task_creator=self.task_creator,
logger=self.logger,
error_handler=self.error_handler)
self.user_input_handler = UserInputManager(operation_control=self, logger=self.logger,
error_handler=self.error_handler)
self.lifecycle_manager = OperationLifecycleManager(queue=self.queue, operation_manager=self.operation_manager,
executor=self.operation_executor,
task_monitor=self.task_monitor, logger=self.logger,
persistent_op_checker=self.persistent_operation_checker,
error_handler=self.error_handler)

async def start(self):
"""
Starts the operation handler.
"""
"""Starts the operation handler."""
self.main_loop.run_forever()

async def start_operations(self) -> None:
for operation_chain in self.queue.queue:
if isinstance(operation_chain, OperationChain):
current_node = operation_chain.head
while current_node is not None:
operation = current_node.operation
if operation.status == "idle":
operation.init_operation()
await operation.start()
self.logger.info(f"start_operations: [START] {operation.name} - {operation.status}")
current_node = current_node.next_node
else:
operation = operation_chain.operation
if operation.status == "idle":
operation.init_operation()
await operation.start()
self.logger.info(f"start_operations: [START] {operation.name} - {operation.status}")

async def resume_all_operations(self):
"""
Resumes all paused operations in the queue.
"""
for operation_list in self.queue.queue:
operation = self.queue.get_head_operation_from_chain(operation_list)
await self.operation_manager.resume_operation(operation)

async def pause_all_operations(self):
"""
Pauses all operations in the queue.
"""

for operation_list in self.queue.queue:
operation = self.queue.get_head_operation_from_chain(operation_list)
await self.operation_manager.pause_operation(operation)

async def stop_all_operations(self):
"""
Stops all operations in the queue.
"""
for operation_node in self.queue.queue:
if isinstance(operation_node, OperationChain):
current_node = operation_node.head
while current_node is not None:
await self.operation_manager.stop_operation(current_node.operation)
current_node = current_node.next_node
else:
await self.operation_manager.stop_operation(operation_node)

async def exec_loop(self):
"""
Executes the main loop of the operation manager.
"""

self.logger.debug("Starting exec_loop")

while True:
try:
# Check for persistent operations
await self.persistent_operation_checker.check_persistent_operations()

# Start all operations in the queue
await self.start_operations()

# Execute all operations in the queue
await self.operation_executor.execute_ready_operations()

# Handle any completed tasks
await self.task_manager.handle_tasks()

except Exception as e:
self.error_handler.handle_error(e, self)
finally:
await asyncio.sleep(self.sleep_time)
"""Executes the main loop of the operation manager."""
await self.lifecycle_manager.exec_loop()
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
Status: Prototype
"""
import asyncio

from neurobehavioral_analytics_suite.operation_manager.OperationChain import OperationChain
from neurobehavioral_analytics_suite.operation_manager.operation.persistent.ConsoleOperation import ConsoleOperation


class OperationExecutor:
def __init__(self, operation_control, queue, task_manager, logger, error_handler):
def __init__(self, operation_control, queue, task_creator, logger, error_handler):
self.op_control = operation_control
self.queue = queue
self.task_manager = task_manager
self.task_creator = task_creator
self.logger = logger
self.error_handler = error_handler

Expand Down Expand Up @@ -64,7 +63,7 @@ async def execute_ready_operations(self) -> None:
self.logger.debug(f"execute_all: [OP] {operation.name} - {operation.status} - {operation.task}")

if not operation.task and operation.is_ready():
operation.task = await self.task_manager.create_task(self.execute_operation(operation),
operation.task = await self.task_creator.create_task(self.execute_operation(operation),
name=operation.name)
if isinstance(operation, ConsoleOperation):
self.op_control.console_operation_in_progress = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
OperationLifecycleManager Module.
This module defines the OperationLifecycleManager class responsible for managing the overall lifecycle of operations
within the Neurobehavioral Analytics Suite. It handles starting, stopping, pausing, and resuming operations.
Author: Lane
Copyright: Lane
Credits: Lane
License: BSD 3-Clause License
Version: 0.0.0.1
Maintainer: Lane
Email: justlane@uw.edu
Status: Prototype
"""

import asyncio
from neurobehavioral_analytics_suite.operation_manager.OperationChain import OperationChain
from neurobehavioral_analytics_suite.operation_manager.OperationExecutor import OperationExecutor
from neurobehavioral_analytics_suite.operation_manager.OperationQueue import OperationQueue
from neurobehavioral_analytics_suite.operation_manager.TaskMonitor import TaskMonitor
from neurobehavioral_analytics_suite.utils.ErrorHandler import ErrorHandler
from neurobehavioral_analytics_suite.utils.Logger import Logger


class OperationLifecycleManager:
"""Manages the lifecycle of operations."""

def __init__(self, queue: OperationQueue, operation_manager, executor: OperationExecutor,
persistent_op_checker, task_monitor: TaskMonitor, logger: Logger, error_handler: ErrorHandler):
"""
Initializes the OperationLifecycleManager with the given parameters.
Args:
queue: The operation queue.
operation_manager: The operation manager.
executor: The operation operation_executor.
logger: Logger for logging lifecycle-related information.
error_handler: Handler for managing errors.
"""
self.queue = queue
self.operation_manager = operation_manager
self.operation_executor = executor
self.persistent_operation_checker = persistent_op_checker
self.task_monitor = task_monitor
self.logger = logger
self.error_handler = error_handler

async def start_all_operations(self):
"""Starts all operations in the queue."""
for operation_chain in self.queue.queue:
if isinstance(operation_chain, OperationChain):
current_node = operation_chain.head
while current_node is not None:
operation = current_node.operation
if operation.status == "idle":
operation.init_operation()
await operation.start()
self.logger.info(f"start_operations: [START] {operation.name} - {operation.status}")
current_node = current_node.next_node
else:
operation = operation_chain.operation
if operation.status == "idle":
operation.init_operation()
await operation.start()
self.logger.info(f"start_operations: [START] {operation.name} - {operation.status}")

async def stop_all_operations(self):
"""Stops all operations in the queue."""
for operation_node in self.queue.queue:
if isinstance(operation_node, OperationChain):
current_node = operation_node.head
while current_node is not None:
await self.operation_manager.stop_operation(current_node.operation)
current_node = current_node.next_node
else:
await self.operation_manager.stop_operation(operation_node)

async def resume_all_operations(self):
"""Resumes all paused operations in the queue."""
for operation_list in self.queue.queue:
operation = self.queue.get_head_operation_from_chain(operation_list)
await self.operation_manager.resume_operation(operation)

async def pause_all_operations(self):
"""Pauses all operations in the queue."""
for operation_list in self.queue.queue:
operation = self.queue.get_head_operation_from_chain(operation_list)
await self.operation_manager.pause_operation(operation)

async def exec_loop(self):
"""Executes the main loop of the operation manager."""
self.logger.debug("Starting exec_loop")

while True:
try:
await self.persistent_operation_checker.check_persistent_operations()
await self.start_all_operations()
await self.operation_executor.execute_ready_operations()
await self.task_monitor.handle_tasks()
except Exception as e:
self.error_handler.handle_error(e, self)
finally:
await asyncio.sleep(0.15)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@


class OperationManager:
def __init__(self, operation_control, queue, task_manager, logger, error_handler):
def __init__(self, operation_control, queue, task_creator, logger, error_handler):
self.op_control = operation_control
self.queue = queue
self.task_manager = task_manager
self.task_creator = task_creator
self.logger = logger
self.error_handler = error_handler

Expand All @@ -38,7 +38,7 @@ async def add_operation(self, operation_type, *args, **kwargs) -> Operation:
return operation

async def add_operation_if_not_exists(self, operation_type, *args, **kwargs):
if not self.task_manager.task_exists(operation_type):
if not self.task_creator.task_exists(operation_type):
await self.add_operation(operation_type, *args, **kwargs)

async def resume_operation(self, operation: Operation) -> None:
Expand Down
Loading

0 comments on commit 1dafc7e

Please sign in to comment.