Skip to content

Commit

Permalink
add mlfoundry
Browse files Browse the repository at this point in the history
  • Loading branch information
khuyentran1401 committed Jan 23, 2022
1 parent 4777378 commit e3cd189
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 209 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ scraping/ghibli.ipynb

# VSCode workspace
*-workspace
.vscode

# C extensions
*.so
Expand Down
29 changes: 13 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/ambv/black
rev: stable
hooks:
- id: black
language_version: python3.7
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0
hooks:
- id: flake8
- repo: https://github.com/timothycrosley/isort
rev: 4.3.21
hooks:
- id: isort
repos:
- repo: https://github.com/ambv/black
rev: 20.8b1
hooks:
- id: black
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.4
hooks:
- id: flake8
- repo: https://github.com/timothycrosley/isort
rev: 5.7.0
hooks:
- id: isort
3 changes: 3 additions & 0 deletions data_science_tools/mlfoundry_example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mlf
output
servicefoundry
Binary file not shown.
57 changes: 26 additions & 31 deletions data_science_tools/mlfoundry_example/data_engineering.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,34 @@
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult

from typing import Any, Dict, List

import mlfoundry as mlf
import pandas as pd
from mlfoundry.mlfoundry_run import MlFoundryRun
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OrdinalEncoder


# ---------------------------------------------------------------------------- #
# Create tasks #
# ---------------------------------------------------------------------------- #
@task
def load_data(path: str) -> pd.DataFrame:
return pd.read_csv(path)


@task(target="{date:%a_%b_%d_%Y_%H-%M-%S}/{task_name}_output", result = LocalResult(dir='data/processed'))
def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
"""Task for getting the classes from the Iris data set."""
return sorted(data[target_col].unique())


@task
def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
"""Task for encoding the categorical columns in the Iris data set."""

return pd.get_dummies(data, columns=[target_col], prefix="", prefix_sep="")
enc = OrdinalEncoder()
data[target_col] = enc.fit_transform(data[[target_col]])
return data


@task(log_stdout=True, target="{date:%a_%b_%d_%Y_%H-%M-%S}/{task_name}_output", result = LocalResult(dir='data/processed'))
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
def split_data(
data: pd.DataFrame, target_col: str, test_data_ratio: float, mlf_run: MlFoundryRun
) -> Dict[str, Any]:
"""Task for splitting the classical Iris data set into training and test
sets, each split into features and labels.
"""

print(f"Splitting data into training and test sets with ratio {test_data_ratio}")

X, y = data.drop(columns=classes), data[classes]
X, y = data.drop(columns=target_col), data[target_col]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_data_ratio)

# When returning many variables, it is a good practice to give them names:
Expand All @@ -47,22 +40,24 @@ def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dic
)


# ---------------------------------------------------------------------------- #
# Create a flow #
# ---------------------------------------------------------------------------- #
def data_engineer_flow(mlf_run: MlFoundryRun):

with Flow("data-engineer") as flow:

# Define parameters
target_col = 'species'
test_data_ratio = Parameter("test_data_ratio", default=0.2)
params = {"target_col": "species", "test_data_ratio": 0.2}

# Log parameters
mlf_run.log_params(params)

# Define tasks
data = load_data(path="data/raw/iris.csv")
classes = get_classes(data=data, target_col=target_col)
categorical_columns = encode_categorical_columns(data=data, target_col=target_col)
train_test_dict = split_data(data=categorical_columns, test_data_ratio=test_data_ratio, classes=classes)
categorical_columns = encode_categorical_columns(
data=data, target_col=params["target_col"]
)
train_test_dict = split_data(
data=categorical_columns,
target_col=params["target_col"],
test_data_ratio=params["test_data_ratio"],
mlf_run=mlf_run,
)

# flow.visualize()
flow.run()
# flow.register(project_name="Iris Project")
return train_test_dict
212 changes: 122 additions & 90 deletions data_science_tools/mlfoundry_example/data_science.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,146 @@
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult

import mlfoundry as mlf
import numpy as np
import pandas as pd

import mlfoundry as mlf

@task
def setup_mlf():
mlf_api = mlf.set_tracking_uri()
return mlf_api.create_run(project_name="Iris-project")
import shap
from mlfoundry.mlfoundry_run import MlFoundryRun
from sklearn.metrics import accuracy_score, f1_score
from sklearn.neighbors import KNeighborsClassifier


# ---------------------------------------------------------------------------- #
# Create tasks #
# ---------------------------------------------------------------------------- #
@task(log_stdout=True)
def train_model(
train_x: pd.DataFrame, train_y: pd.DataFrame, num_train_iter: int, learning_rate: float) -> np.ndarray:
"""Task for training a simple multi-class logistic regression model. The
number of training iterations as well as the learning rate are taken from
conf/project/parameters.yml. All of the data as well as the parameters
will be provided to this function at the time of execution.
"""
num_iter = num_train_iter
lr = learning_rate
train_x: pd.DataFrame,
train_y: pd.Series,
n_neighbors: int,
mlf_run: MlFoundryRun,
) -> np.ndarray:

X = train_x.to_numpy()
Y = train_y.to_numpy()

# Add bias to the features
bias = np.ones((X.shape[0], 1))
X = np.concatenate((bias, X), axis=1)

weights = []
# Train one model for each class in Y
for k in range(Y.shape[1]):
# Initialise weights
theta = np.zeros(X.shape[1])
y = Y[:, k]
for _ in range(num_iter):
z = np.dot(X, theta)
h = _sigmoid(z)
gradient = np.dot(X.T, (h - y)) / y.size
theta -= lr * gradient
# Save the weights for each model
weights.append(theta)
# Create a new model instance
knn = KNeighborsClassifier(n_neighbors=n_neighbors)

# Train the model
knn.fit(X, Y)

# Print finishing training message
print("Finish training the model.")

# Return a joint multi-class model with weights for all classes
return np.vstack(weights).transpose()


def _sigmoid(z):
"""A helper sigmoid function used by the training and the scoring tasks."""
return 1 / (1 + np.exp(-z))

@task
def predict(model: np.ndarray, test_x: pd.DataFrame) -> np.ndarray:
"""Task for making predictions given a pre-trained model and a test set."""
X = test_x.to_numpy()

# Add bias to the features
bias = np.ones((X.shape[0], 1))
X = np.concatenate((bias, X), axis=1)

# Predict "probabilities" for each class
result = _sigmoid(np.dot(X, model))

# Return the index of the class with max probability for all samples
return np.argmax(result, axis=1)


@task(log_stdout=True)
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
"""Task for reporting the accuracy of the predictions performed by the
previous task. Notice that this function has no outputs, except logging.
"""
# Get true class index
target = np.argmax(test_y.to_numpy(), axis=1)
# Calculate accuracy of predictions
accuracy = np.sum(predictions == target) / target.shape[0]
# Log the accuracy of the model
print(f"Model accuracy on test set: {round(accuracy * 100, 2)}")
# Log model
mlf_run.log_model(knn, mlf.ModelFramework.SKLEARN)

return knn


def predict(model: np.ndarray, X: pd.DataFrame) -> np.ndarray:
"""Make predictions given a pre-trained model and a test set."""
X = X.to_numpy()

return {"predictions": model.predict(X)}


def get_shap_values(model, X_train: pd.DataFrame, X_test: pd.DataFrame):
explainer = shap.KernelExplainer(model.predict_proba, X_train)
return explainer.shap_values(X_test)


def log_data_stats(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.Series,
test_y: pd.Series,
model,
mlf_run: MlFoundryRun,
):
prediction_train = pd.DataFrame(predict(model, train_x))
prediction_test = pd.DataFrame(predict(model, test_x))

train_data = pd.concat([train_x, train_y], axis=1).reset_index(drop=True)
test_data = pd.concat([test_x, test_y], axis=1).reset_index(drop=True)

# Log data
mlf_run.log_dataset(train_data, data_slice=mlf.DataSlice.TRAIN)
mlf_run.log_dataset(test_data, data_slice=mlf.DataSlice.TEST)

# Concat data and predictions
train_df = pd.concat(
[
train_data,
prediction_train,
],
axis=1,
)
test_df = pd.concat(
[
test_data,
prediction_test,
],
axis=1,
)

# Get SHAP values
shap_values = get_shap_values(model, train_x, test_x)

# Log dataset stats
data_schema = mlf.Schema(
feature_column_names=list(train_df.columns),
actual_column_name="species",
prediction_column_name="predictions",
)

mlf_run.log_dataset_stats(
train_df,
data_slice=mlf.DataSlice.TRAIN,
data_schema=data_schema,
model_type=mlf.ModelType.MULTICLASS_CLASSIFICATION,
# shap_values=shap_values # ! Uncomment this give an error: Details: [Errno 2] No such file or directory: './resources/failure.png'
)
mlf_run.log_dataset_stats(
test_df,
data_slice=mlf.DataSlice.TEST,
data_schema=data_schema,
model_type=mlf.ModelType.MULTICLASS_CLASSIFICATION,
# shap_values=shap_values
)

log_metrics(prediction_test["predictions"], test_y, mlf_run)


def log_metrics(
predictions: np.ndarray, test_y: pd.DataFrame, mlf_run: MlFoundryRun
) -> None:

target = test_y.to_numpy()

# Get metrics
metrics = {}
metrics["accuracy"] = accuracy_score(target, predictions)
metrics["f1_score"] = f1_score(target, predictions, average="weighted")

# Log metrics
mlf_run.log_metrics(metrics)


# ---------------------------------------------------------------------------- #
# Create a flow #
# ---------------------------------------------------------------------------- #

with Flow("data-science") as flow:


train_test_dict = LocalResult(dir='data/processed/Mon_Dec_20_2021_20:55:20').read(location='split_data_output').value
def data_science_flow(train_test_dict: dict, mlf_run: MlFoundryRun):

# Load data
train_x = train_test_dict['train_x']
train_y = train_test_dict['train_y']
test_x = train_test_dict['test_x']
test_y = train_test_dict['test_y']
train_x = train_test_dict["train_x"]
train_y = train_test_dict["train_y"]
test_x = train_test_dict["test_x"]
test_y = train_test_dict["test_y"]

# Define parameters
num_train_iter = Parameter('num_train_iter', default=10000)
learning_rate = Parameter('learning_rate', default = 0.01)

# Define tasks
model = train_model(train_x, train_y, num_train_iter, learning_rate)
predictions = predict(model, test_x)
report_accuracy(predictions, test_y)
params = {"n_neighbors": 12}

# Log parameters
mlf_run.log_params(params)

# Define tasks
model = train_model(train_x, train_y, params["n_neighbors"], mlf_run)

flow.run()
log_data_stats(train_x, test_x, train_y, test_y, model, mlf_run)
19 changes: 9 additions & 10 deletions data_science_tools/mlfoundry_example/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
import mlfoundry as mlf
from data_engineering import data_engineer_flow
from data_science import data_science_flow

data_engineering_flow = StartFlowRun(
flow_name="data-engineer", project_name='Iris Project', wait=True, parameters={'test_data_ratio': 0.3})
data_science_flow = StartFlowRun(
flow_name="data-science", project_name='Iris Project', wait=True)
# Initialize a new MLFoundryRun
mlf_api = mlf.get_client()
mlf_run = mlf_api.create_run(project_name="Iris-project")

with Flow("main-flow") as flow:
result = data_science_flow(upstream_tasks=[data_engineering_flow])

flow.run()
# Run flows
train_test_dict = data_engineer_flow(mlf_run)
data_science_flow(train_test_dict, mlf_run)
Loading

0 comments on commit e3cd189

Please sign in to comment.