Skip to content

Commit

Permalink
fixes #151, DAG parallel working. Added progressbar. (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
elphick authored May 15, 2024
1 parent 523328c commit 9dc617d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 79 deletions.
15 changes: 15 additions & 0 deletions HISTORY.rst → CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
Mass_Composition 0.5.2 (2024-05-16)
===================================

Bugfix
------

- Fixed error when setting n_jobs > 1 in DAG. (#151)


Feature
-------

- Added progressbar to DAG. (#151)


Mass_Composition 0.5.1 (2024-05-15)
===================================

Expand Down
113 changes: 42 additions & 71 deletions elphick/mass_composition/dag.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from typing import List, Callable, Union, Optional, Tuple
from typing import List, Callable, Union, Tuple

import matplotlib.pyplot as plt
import networkx as nx
from joblib import Parallel, delayed
from tqdm import tqdm

from elphick.mass_composition import Stream, MassComposition

Expand All @@ -26,35 +27,6 @@ def __init__(self, name: str = 'DAG', n_jobs=-1):
self.graph = nx.DiGraph()
self.node_executed = {} # Store the execution state of nodes

# @property
# def mass_compositions(self) -> dict[str, Stream]:
# """
# Retrieves all the Stream objects associated with the nodes in the DAG.
#
# This property iterates over all the nodes in the DAG, checks if the node has been executed,
# and if so, retrieves the result of the node execution. If the node has not been executed,
# it retrieves the Stream object associated with the node in the graph.
#
# Returns:
# dict[str, Stream]: A dictionary where the keys are node names and the values
# are Stream objects associated with those nodes.
# """
# mass_compositions = {}
# for node in self.graph.nodes:
# if node in self.node_executed:
# result = self.node_executed[node]
# if isinstance(result, Stream):
# mass_compositions[node] = result
# elif isinstance(result, tuple) and all(isinstance(r, Stream) for r in result):
# for r in result:
# mass_compositions[r.name] = r
# else:
# # If the node is not in the results dictionary, it is a leaf node
# # Retrieve its result from the Stream objects associated with the node in the graph
# mc = self.graph.nodes[node]['mc']
# mass_compositions[node] = mc
# return mass_compositions

@property
def streams(self):
"""
Expand Down Expand Up @@ -93,7 +65,8 @@ def add_input(self, name: str) -> 'DAG':
self.stream_parent_node[name] = name
return self

def add_step(self, name: str, operation: Callable, streams: List[str], kwargs: dict = None, defined: bool = True) -> 'DAG':
def add_step(self, name: str, operation: Callable, streams: List[str], kwargs: dict = None,
defined: bool = True) -> 'DAG':
# Determine dependencies from the input streams
dependencies = [self.stream_parent_node[stream] for stream in streams]
self.graph.add_node(name, operation=operation, dependencies=dependencies, kwargs=kwargs, defined=defined)
Expand Down Expand Up @@ -133,13 +106,17 @@ def run(self, input_streams: dict):
Returns:
None
"""
logger.info("Running the DAG") # Log the node that is being executed
logger.info("Preparing the DAG")
self._finalize()
logger.info("Executing the DAG")

# Initialize the execution state of all nodes to False
for node in self.graph.nodes:
self.node_executed[node] = False

# Initialise a progressbar that will count up to the number of nodes in the graph
pbar = tqdm(total=len(self.graph.nodes), desc="Executing nodes", unit="node")

executed_nodes = set() # Keep track of nodes that have been executed

while len(executed_nodes) < len(self.graph):
Expand All @@ -159,7 +136,7 @@ def run(self, input_streams: dict):
logger.debug(f"Node {node} is waiting for {predecessors}")

# Create a job for each ready node
jobs = [delayed(self.execute_node)(node, input_streams, executed_nodes) for node in ready_nodes]
jobs = [delayed(self.execute_node)(node, input_streams) for node in ready_nodes]

# Execute the jobs in parallel
if jobs:
Expand All @@ -169,11 +146,21 @@ def run(self, input_streams: dict):
else:
results = []

# Update executed_nodes with the returned value of each job
for i, result in enumerate(results):
executed_nodes.add(ready_nodes[i])

def execute_node(self, node: str, strms: dict, executed_nodes: set) -> Optional[Union[Stream, Tuple[Stream, ...]]]:
# Update executed_nodes and self.graph.edges with the returned values
for node, result, updated_edges in results:
executed_nodes.add(node)
self.node_executed[node] = True
for edge, strm in updated_edges.items():
logger.debug(f"Updating edge {edge} with stream {strm.name}")
self.graph.edges[edge]['mc'] = strm
# update the progress bar by one step
pbar.set_postfix_str(f"Processed node: {node}")
pbar.update()

pbar.close() # Close the progress bar
logger.info(f"DAG execution complete for the nodes: {executed_nodes}")

def execute_node(self, node: str, strms: dict):
"""
Executes a node in the DAG.
Expand All @@ -188,7 +175,7 @@ def execute_node(self, node: str, strms: dict, executed_nodes: set) -> Optional[
Returns:
Union[Stream, Tuple[Stream, ...]]: The result of the node execution, or None if the node is waiting for its predecessors.
"""
logger.info(f"Executing node {node}") # Log the node that is being executed
logger.debug(f"Executing node {node}") # Log the node that is being executed
operation = self.graph.nodes[node]['operation']
kwargs = self.graph.nodes[node]['kwargs']
defined = self.graph.nodes[node]['defined']
Expand Down Expand Up @@ -220,50 +207,34 @@ def execute_node(self, node: str, strms: dict, executed_nodes: set) -> Optional[
# In this case, execute the operation with the results of its dependencies as inputs
# Check if the results of the predecessors are available

if all(self.node_executed[dependency] for dependency in self.graph.predecessors(node)):
inputs = [self.graph.get_edge_data(*edge)['mc'] for edge in self.graph.in_edges(node)]
# If only one input stream is provided, retrieve the corresponding Stream object
if len(inputs) == 1:
inputs = inputs[0]
# Check if kwargs is not None before passing it to the operation
result = operation(inputs, **kwargs) if kwargs is not None else operation(inputs)
else:
# Ensure inputs is always an iterable
if isinstance(inputs, Stream):
inputs = [inputs]
# Check if kwargs is not None before passing it to the operation
result = operation(*inputs, **kwargs) if kwargs is not None else operation(*inputs)
inputs = [self.graph.get_edge_data(*edge)['mc'] for edge in self.graph.in_edges(node)]
# If only one input stream is provided, retrieve the corresponding Stream object
if len(inputs) == 1:
inputs = inputs[0]
# Check if kwargs is not None before passing it to the operation
result = operation(inputs, **kwargs) if kwargs is not None else operation(inputs)
else:
logger.debug(f"Waiting for predecessors of node {node}")
return None
# Ensure inputs is always an iterable
if isinstance(inputs, Stream):
inputs = [inputs]
# Check if kwargs is not None before passing it to the operation
result = operation(*inputs, **kwargs) if kwargs is not None else operation(*inputs)

except AttributeError as e:
logger.error(f"Error while executing node {node}: {e}")
raise

# If the node has successors and is defined, store the result of the node execution in the edges of the graph
# Return the node, result, and the updated edges
updated_edges = {}
if list(self.graph.successors(node)) and defined:
if isinstance(result, tuple):
for i, strm in enumerate(result):
self.graph.edges[(node, list(self.graph.successors(node))[i])]['mc'] = strm
logger.debug(f"Stored results for stream {strm.name}") # Log the node for which a result was stored
updated_edges[(node, list(self.graph.successors(node))[i])] = strm
else:
for successor in self.graph.successors(node):
self.graph.edges[(node, successor)]['mc'] = result
logger.debug(f"Stored results for stream {node}") # Log the node for which a result was stored

# After executing the operation, update the execution state of the node
executed_nodes.add(node)
self.node_executed[node] = True # Update the execution state in the self.node_executed dictionary

# Log the state of the self.streams dictionary
logger.debug(f"State of self.streams after executing node {node}: {self.streams}")

# Ensure the result is always a tuple
if isinstance(result, Stream):
return (result, None)
updated_edges[(node, successor)] = result

return result
return node, result, updated_edges

def plot(self):
plt.figure(figsize=(8, 6))
Expand Down
8 changes: 4 additions & 4 deletions examples/502_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
from elphick.mass_composition.datasets.sample_data import sample_data
from elphick.mass_composition.flowsheet import Flowsheet

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# %%
# Define the DAG
# --------------
#
# The DAG is defined by adding nodes to the graph. Each node is a MassComposition operation
# (or a DAG.input or DAG_output).
# The DAG is defined by adding nodes to the graph. Each node is an input, output or Stream operation
# (e.g. add, split, etc.). The nodes are connected by the streams they operate on.

mc_sample: MassComposition = MassComposition(sample_data(), name='sample')

Expand All @@ -48,7 +48,7 @@
# Run the DAG
# -----------
#
# The dag is run by providing MassComposition objects for all inputs. They must be compatible i.e. have the
# The dag is run by providing MassComposition (or Stream) objects for all inputs. They must be compatible i.e. have the
# same indexes.

dag.run({'feed_1': mc_sample,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mass-composition"
version = "0.5.1"
version = "0.5.2"
description = "For managing multi-dimensional mass-composition datasets, supporting weighted mathematical operations and visualisation."
authors = ["Greg <greg@elphick.com.au>"]
packages = [{ include = "elphick/mass_composition" }]
Expand Down
9 changes: 6 additions & 3 deletions scripts/bump_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@


def run_command(command):
process = subprocess.Popen(command, shell=True)
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(f"stdout: {stdout.decode()}")
print(f"stderr: {stderr.decode()}")
process.wait()


Expand All @@ -21,7 +24,7 @@ def process_command_line_parameters():


def adjust_changelog():
with open('HISTORY.rst', 'r') as file:
with open('CHANGELOG.rst', 'r') as file:
lines = file.readlines()

# Remove 'Elphick.' prefix from the first line
Expand All @@ -33,7 +36,7 @@ def adjust_changelog():
if lines[1].startswith('='):
lines[1] = '=' * (len(lines[0].strip())) + '\n' # -1 for the newline character

with open('HISTORY.rst', 'w') as file:
with open('CHANGELOG.rst', 'w') as file:
file.writelines(lines)


Expand Down

0 comments on commit 9dc617d

Please sign in to comment.