Skip to content

Commit

Permalink
closes #173. Improved prefix management. Requested functionality alre…
Browse files Browse the repository at this point in the history
…ady exists. (#175)
  • Loading branch information
elphick authored May 19, 2024
1 parent c4c0e8a commit b8fb961
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 41 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
Mass_Composition 0.6.5 (2024-05-18)
Mass_Composition 0.6.7 (2024-05-19)
===================================

Feature
-------

- Improved prefix management on mc init from dataframe
- Added cleaning code to iron_ore_met_sample_data" method in sample_data
- No changes need to provide additional features to split_by_estimator since attr vars are already
passed to outputs of math operations. (#172)

Mass_Composition 0.6.6 (2024-05-18)
===================================

Feature
Expand Down
2 changes: 2 additions & 0 deletions elphick/mass_composition/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def add_input(self, name: str) -> 'DAG':

def add_step(self, name: str, operation: Callable, streams: List[str], kwargs: dict = None,
defined: bool = True) -> 'DAG':
if name in self.all_nodes_:
raise ValueError(f"A step with the name '{name}' already exists.")
# Determine dependencies from the input streams
dependencies = [self.stream_parent_node[stream] for stream in streams]
self.graph.add_node(name, operation=operation, dependencies=dependencies, kwargs=kwargs, defined=defined)
Expand Down
10 changes: 10 additions & 0 deletions elphick/mass_composition/datasets/sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ def iron_ore_met_sample_data() -> pd.DataFrame:
col.strip().lower().replace(' ', '_').replace('(', '').replace(')', '').replace('%', 'pct').replace('__', '_')
for
col in df_met.columns]

# clean up some values and types
df_met = df_met.replace('-', np.nan).replace('#VALUE!', np.nan)
head_cols: List[str] = [col for col in df_met.columns if 'head' in col]
df_met[head_cols] = df_met[head_cols].astype('float64')
df_met['bulk_hole_no'] = df_met['bulk_hole_no'].astype('category')
df_met['sample_number'] = df_met['sample_number'].astype('int64')
df_met.set_index('sample_number', inplace=True)

# moves suffixes to prefix
df_met = df_met.pipe(_move_suffix_to_prefix, '_head')
df_met = df_met.pipe(_move_suffix_to_prefix, '_lump')
return df_met
Expand Down
60 changes: 43 additions & 17 deletions elphick/mass_composition/mass_composition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from collections import Counter
from copy import deepcopy
from pathlib import Path
from typing import Dict, List, Optional, Union, Tuple, Iterable, Callable, Set, Literal, Any
Expand Down Expand Up @@ -75,14 +76,36 @@ def __init__(self,
self.status: Optional[Status] = None

if data is not None:
data = deepcopy(data) # preserve the incoming data variable.
self.set_data(data, constraints=constraints)

@staticmethod
def _strip_common_prefix(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
common_prefix = os.path.commonprefix(df.columns.to_list())
stripped_df = df.copy()
stripped_df.columns = [col.replace(common_prefix, '') for col in df.columns]
return stripped_df, common_prefix
# Extract prefixes
common_prefix = MassComposition.get_common_prefix(df.columns.to_list())

res = df
# Create a copy of the dataframe and strip the most common prefix from column names
if common_prefix:
res = df.copy()
res.columns = [col.replace(common_prefix + '_', '') if col.startswith(common_prefix) else col for col in
df.columns]

return res, common_prefix

@staticmethod
def get_common_prefix(columns: List[str]) -> str:
prefixes = [col.split('_')[0] for col in columns]
# Count the frequency of each prefix
prefix_counter = Counter(prefixes)
# Check if prefix_counter is not empty
if prefix_counter:
# Find the most common prefix
common_prefix, freq = prefix_counter.most_common(1)[0]
# Only return the prefix if its frequency is 3 or more
if freq >= 3:
return common_prefix
return ""

def set_data(self, data: Union[pd.DataFrame, xr.Dataset],
constraints: Optional[Dict[str, List]] = None):
Expand All @@ -104,7 +127,8 @@ def set_data(self, data: Union[pd.DataFrame, xr.Dataset],
# seek a prefix to self assign the name
data, common_prefix = self._strip_common_prefix(data)
if common_prefix:
self._specified_columns = {k: v.replace(common_prefix, '') for k, v in self._specified_columns.items()
self._specified_columns = {k: v.replace(f"{common_prefix}_", '') for k, v in
self._specified_columns.items()
if v is not None}

self.variables = Variables(config=self.config['vars'],
Expand Down Expand Up @@ -630,24 +654,24 @@ def split_by_estimator(self,
"""
# Extract feature names from the estimator, and get the actual features
feature_names: list[str] = list(extract_feature_names(estimator))
features: pd.DataFrame = self._get_features(feature_names, extra_features, allow_prefix_mismatch)
features: pd.DataFrame = self._get_features(feature_names, allow_prefix_mismatch=allow_prefix_mismatch,
extra_features=extra_features)

# Apply the estimator
estimates: pd.DataFrame = estimator.predict(X=features)
if isinstance(estimates, np.ndarray):
raise NotImplementedError("The estimator must return a DataFrame")

# Detect a possible prefix from the estimate columns
features_prefix: str = os.path.commonprefix(features.columns.to_list())
estimates_prefix: str = os.path.commonprefix(estimates.columns.to_list())
features_prefix: str = self.get_common_prefix(features.columns.to_list())
estimates_prefix: str = self.get_common_prefix(estimates.columns.to_list())

# If there is a prefix, check that it matches name_1, subject to allow_prefix_mismatch
if estimates_prefix.strip(
'_') and not allow_prefix_mismatch and name_1 and not name_1 == estimates_prefix.strip('_'):
if estimates_prefix and not allow_prefix_mismatch and name_1 and not name_1 == estimates_prefix:
raise ValueError(f"Common prefix mismatch: {features_prefix} and name_1: {name_1}")

# assign the output names, based on specified names, allow for prefix mismatch
name_1 = name_1 if name_1 else estimates_prefix.strip('_')
name_1 = name_1 if name_1 else estimates_prefix

if mass_recovery_column:
# Transform the mass recovery to mass by applying the mass recovery to the dry mass of the input stream
Expand All @@ -661,7 +685,9 @@ def split_by_estimator(self,
dry_mass_var].values / mass_recovery_max
estimates.rename(columns={mass_recovery_column: dry_mass_var}, inplace=True)

estimates.columns = [f.replace(estimates_prefix, "") for f in estimates.columns]
if estimates_prefix:
col_name_map: dict[str, str] = {f: f.replace(estimates_prefix + '_', "") for f in estimates.columns}
estimates.rename(columns=col_name_map, inplace=True)

out: MassComposition = MassComposition(name=name_1, constraints=self.constraints, data=estimates)
comp: MassComposition = self.sub(other=out, name=name_2)
Expand All @@ -671,7 +697,7 @@ def split_by_estimator(self,
return out, comp

def _get_features(self, feature_names: List[str], allow_prefix_mismatch: bool,
extra_features: Optional[pd.DataFrame] = None,) -> pd.DataFrame:
extra_features: Optional[pd.DataFrame] = None, ) -> pd.DataFrame:
"""
This method checks if the feature names required by an estimator are present in the data. If not, it tries to
match the feature names by considering a common prefix. If a match is found, the columns in the data are renamed
Expand All @@ -696,16 +722,16 @@ def _get_features(self, feature_names: List[str], allow_prefix_mismatch: bool,
feature_name_map = {name.lower(): name for name in feature_names}

df_features: pd.DataFrame = self.data.to_dataframe()
if extra_features:
if extra_features is not None:
df_features = pd.concat([df_features, extra_features], axis=1)

missing_features = set(f.lower() for f in feature_names) - set(c.lower() for c in df_features.columns)

if missing_features:
prefix: str = f"{self.name}_"
common_prefix: str = os.path.commonprefix(feature_names)
if common_prefix and common_prefix != prefix and allow_prefix_mismatch:
prefix = common_prefix
common_prefix: str = self.get_common_prefix(feature_names)
if common_prefix and common_prefix + '_' != prefix and allow_prefix_mismatch:
prefix = common_prefix + '_'

# create a map to support renaming the columns
prefixed_feature_map: dict[str, str] = {f: feature_name_map.get(f"{prefix}{f.lower()}") for f in
Expand Down
67 changes: 46 additions & 21 deletions examples/504_dag_with_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# This import at the top to guard against the estimator extras not being installed
from elphick.mass_composition.utils.sklearn import PandasPipeline

import numpy as np
import pandas as pd
import plotly
from sklearn.ensemble import RandomForestRegressor
Expand All @@ -39,18 +38,19 @@
# ---------
#
# We load some metallurgical data from a drill program, REF: A072391
# Since we are not concerned about the model performance in this example, we'll convert the categorical feature
# bulk_hole_no to an integer

df: pd.DataFrame = iron_ore_met_sample_data()

base_components = ['fe', 'p', 'sio2', 'al2o3', 'loi']
cols_x = ['dry_weight_lump_kg'] + [f'head_{comp}' for comp in base_components]
cols_x = ['dry_weight_lump_kg'] + [f'head_{comp}' for comp in base_components] + ['bulk_hole_no']
cols_y = ['lump_pct'] + [f'lump_{comp}' for comp in base_components]

# %%
df = df.loc[:, ['sample_number'] + cols_x + cols_y].query('lump_pct>0').replace('-', np.nan).astype(float).dropna(
how='any')
df = df.rename(columns={'dry_weight_lump_kg': 'head_mass_dry'}).set_index('sample_number')
df.index = df.index.astype(int)
df = df.loc[:, cols_x + cols_y].query('lump_pct>0').dropna(how='any')
df = df.rename(columns={'dry_weight_lump_kg': 'head_mass_dry'})
df['bulk_hole_no'] = df['bulk_hole_no'].astype('category').cat.codes

logger.info(df.shape)
df.head()

Expand Down Expand Up @@ -86,40 +86,65 @@
# ------------------------------------
# Now we will create a MassComposition object and use it to apply the model to the feed stream.

head: MassComposition = MassComposition(data=X[[col for col in X.columns if 'head' in col]],
head: MassComposition = MassComposition(data=X_test.drop(columns=['bulk_hole_no']), name='head',
mass_dry_var='head_mass_dry')
lump, fines = head.split_by_estimator(estimator=pipe, name_2='fines',
mass_recovery_column='lump_pct', mass_recovery_max=100)
mass_recovery_column='lump_pct', mass_recovery_max=100,
extra_features=X_test['bulk_hole_no'])
lump.data.to_dataframe().head()

lump
# %%
fines
fines.data.to_dataframe().head()

# %%
# Define the DAG
# --------------
#
# The DAG is defined by adding nodes to the graph. Each node is an input, output or Stream operation
# (e.g. add, split, etc.). The nodes are connected by the streams they operate on.
# First we define a simple DAG, where the feed stream is split into two streams, lump and fines.
# The lump estimator requires the usual mass-composition variables plus an addition feature/variable
# called `bulk_hole_no`. Since the `bulk_hole_no` is available in the feed stream, it is immediately accessible
# to the estimator.

head: MassComposition = MassComposition(data=X_test, name='head',
mass_dry_var='head_mass_dry')

dag = DAG(name='A072391', n_jobs=2)
dag = DAG(name='A072391', n_jobs=1)
dag.add_input(name='head')
dag.add_step(name='screen', operation=Stream.split_by_estimator, streams=['head'],
kwargs={'estimator': pipe, 'name_1': 'lump', 'name_2': 'fines',
'mass_recovery_column': 'lump_pct', 'mass_recovery_max': 100})
dag.add_output(name='lump', stream='lump')
dag.add_output(name='fines', stream='fines')
dag.run(input_streams={'head': head}, progress_bar=True)

fig = Flowsheet.from_dag(dag).plot_network()
fig

# %%
# Run the DAG
# -----------
# More Complex DAG
# ----------------
# This DAG is to test a more complex flowsheet where the estimator may have all the features
# immediately available in the parent stream.
#
# The dag is run by providing a Stream object for the input.
# .. note::
# This example works, but it does so since all attribute (extra) variables are passed all the way around
# the network in the current design. This is to be changed in the future to allow for more efficient processing.
# Once attributes are no longer passed, changes will be needed to the DAG to marshall
# features from other streams in the network (most often the input stream).

dag.run({'head': head}, progress_bar=True)

# %%
# Create a Flowsheet object from the dag, enabling all the usual network plotting and analysis methods.
dag = DAG(name='A072391', n_jobs=1)
dag.add_input(name='head')
dag.add_step(name='screen', operation=Stream.split_by_estimator, streams=['head'],
kwargs={'estimator': pipe, 'name_1': 'lump', 'name_2': 'fines',
'mass_recovery_column': 'lump_pct', 'mass_recovery_max': 100})
dag.add_step(name='screen_2', operation=Stream.split_by_estimator, streams=['fines'],
kwargs={'estimator': pipe, 'name_1': 'lump_2', 'name_2': 'fines_2',
'mass_recovery_column': 'lump_pct', 'mass_recovery_max': 100,
'allow_prefix_mismatch': True})
dag.add_output(name='lump', stream='lump_2')
dag.add_output(name='fines', stream='fines_2')
dag.add_output(name='stockpile', stream='lump')
dag.run(input_streams={'head': head}, progress_bar=True)

fs: Flowsheet = Flowsheet.from_dag(dag)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mass-composition"
version = "0.6.6"
version = "0.6.7"
description = "For managing multi-dimensional mass-composition datasets, supporting weighted mathematical operations and visualisation."
authors = ["Greg <greg@elphick.com.au>"]
packages = [{ include = "elphick/mass_composition" }]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ def test_names(demo_data):
xr_data: xr.Dataset = obj_mc._data
df_export: pd.DataFrame = xr_data.mc.to_dataframe(original_column_names=True)
for col in demo_data.columns:
assert col in list(df_export.columns), f'{col} is not in {list(demo_data.columns)}'
assert col in df_export.columns.to_list(), f'{col} is not in {demo_data.columns.to_list()}'

0 comments on commit b8fb961

Please sign in to comment.