Skip to content

Modified to use dataset and pipelinedata #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ EVALUATE_SCRIPT_PATH = 'evaluate/evaluate_model.py'
REGISTER_SCRIPT_PATH = 'register/register_model.py'
SOURCES_DIR_TRAIN = 'diabetes_regression'
DATASET_NAME = 'diabetes_ds'
DATASTORE_NAME = 'datablobstore'
DATAFILE_NAME = 'diabetes.csv'

# Optional. Used by a training pipeline with R on Databricks
DB_CLUSTER_ID = ''
Expand Down
19 changes: 6 additions & 13 deletions diabetes_regression/evaluate/evaluate_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
help="Name of the Model",
default="sklearn_regression_model.pkl",
)

parser.add_argument(
"--allow_run_cancel",
type=str,
Expand Down Expand Up @@ -122,18 +123,10 @@
model_name, tag_name, exp.name, ws)

if (model is not None):

production_model_run_id = model.run_id

# Get the run history for both production model and
# newly trained model and compare mse
production_model_run = Run(exp, run_id=production_model_run_id)
new_model_run = run.parent
print("Production model run is", production_model_run)

production_model_mse = \
production_model_run.get_metrics().get(metric_eval)
new_model_mse = new_model_run.get_metrics().get(metric_eval)
production_model_mse = 10000
if (metric_eval in model.tags):
production_model_mse = float(model.tags[metric_eval])
new_model_mse = float(run.parent.get_metrics().get(metric_eval))
if (production_model_mse is None or new_model_mse is None):
print("Unable to find", metric_eval, "metrics, "
"exiting evaluation")
Expand All @@ -151,7 +144,7 @@
print("New trained model performs better, "
"thus it should be registered")
else:
print("New trained model metric is less than or equal to "
print("New trained model metric is worse than or equal to "
"production model so skipping model registration.")
if((allow_run_cancel).lower() == 'true'):
run.parent.cancel()
Expand Down
83 changes: 55 additions & 28 deletions diabetes_regression/register/register_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
import argparse
import traceback
import joblib
from azureml.core import Run, Experiment, Workspace
from azureml.core.model import Model as AMLModel

Expand Down Expand Up @@ -63,17 +64,24 @@ def main():
type=str,
help="The Build ID of the build triggering this pipeline run",
)

parser.add_argument(
"--run_id",
type=str,
help="Training run ID",
)

parser.add_argument(
"--model_name",
type=str,
help="Name of the Model",
default="sklearn_regression_model.pkl",
)
parser.add_argument(
"--step_input",
type=str,
help=("input from previous steps")
)

args = parser.parse_args()
if (args.build_id is not None):
Expand All @@ -83,18 +91,42 @@ def main():
if (run_id == 'amlcompute'):
run_id = run.parent.id
model_name = args.model_name
model_path = args.step_input

if (build_id is None):
register_aml_model(model_name, exp, run_id)
else:
run.tag("BuildId", value=build_id)
builduri_base = os.environ.get("BUILDURI_BASE")
if (builduri_base is not None):
build_uri = builduri_base + build_id
run.tag("BuildUri", value=build_uri)
register_aml_model(model_name, exp, run_id, build_id, build_uri)
# load the model
print("Loading model from " + model_path)
model_file = os.path.join(model_path, model_name)
model = joblib.load(model_file)
model_mse = run.parent.get_metrics()["mse"]

if (model is not None):
if (build_id is None):
register_aml_model(model_file, model_name, exp, run_id)
else:
register_aml_model(model_name, exp, run_id, build_id)
run.tag("BuildId", value=build_id)
builduri_base = os.environ.get("BUILDURI_BASE")
if (builduri_base is not None):
build_uri = builduri_base + build_id
run.tag("BuildUri", value=build_uri)
register_aml_model(
model_file,
model_name,
model_mse,
exp,
run_id,
build_id,
build_uri)
else:
register_aml_model(
model_file,
model_name,
model_mse,
exp,
run_id,
build_id)
else:
print("Model not found. Skipping model registration.")
sys.exit(0)


def model_already_registered(model_name, exp, run_id):
Expand All @@ -109,35 +141,30 @@ def model_already_registered(model_name, exp, run_id):


def register_aml_model(
model_path,
model_name,
model_mse,
exp,
run_id,
build_id: str = 'none',
build_uri=None
):
try:
tagsValue = {"area": "diabetes_regression",
"run_id": run_id,
"experiment_name": exp.name,
"mse": model_mse}
if (build_id != 'none'):
model_already_registered(model_name, exp, run_id)
run = Run(experiment=exp, run_id=run_id)
tagsValue = {"area": "diabetes_regression",
"BuildId": build_id, "run_id": run_id,
"experiment_name": exp.name}
tagsValue["BuildId"] = build_id
if (build_uri is not None):
tagsValue["BuildUri"] = build_uri
else:
run = Run(experiment=exp, run_id=run_id)
if (run is not None):
tagsValue = {"area": "diabetes_regression",
"run_id": run_id, "experiment_name": exp.name}
else:
print("A model run for experiment", exp.name,
"matching properties run_id =", run_id,
"was not found. Skipping model registration.")
sys.exit(0)

model = run.register_model(model_name=model_name,
model_path="./outputs/" + model_name,
tags=tagsValue)

model = AMLModel.register(
workspace=exp.workspace,
model_name=model_name,
model_path=model_path,
tags=tagsValue)
os.chdir("..")
print(
"Model registered: {} \nModel Description: {} "
Expand Down
42 changes: 20 additions & 22 deletions diabetes_regression/training/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
POSSIBILITY OF SUCH DAMAGE.
"""
from azureml.core.run import Run
from azureml.core import Dataset
import os
import argparse
from sklearn.datasets import load_diabetes
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
Expand Down Expand Up @@ -65,19 +63,20 @@ def main():
)

parser.add_argument(
"--dataset_name",
"--step_output",
type=str,
help=("Dataset with the training data")
help=("output for passing data to next step")
)

args = parser.parse_args()

print("Argument [build_id]: %s" % args.build_id)
print("Argument [model_name]: %s" % args.model_name)
print("Argument [dataset_name]: %s" % args.dataset_name)
print("Argument [step_output]: %s" % args.step_output)

model_name = args.model_name
build_id = args.build_id
dataset_name = args.dataset_name
step_output_path = args.step_output

print("Getting training parameters")

Expand All @@ -91,15 +90,17 @@ def main():
print("Parameter alpha: %s" % alpha)

run = Run.get_context()
ws = run.experiment.workspace

if (dataset_name):
dataset = Dataset.get_by_name(workspace=ws, name=dataset_name)
# Get the dataset
dataset = run.input_datasets['training_data']
if (dataset):
df = dataset.to_pandas_dataframe()
X = df.values
y = df.Y
else:
X, y = load_diabetes(return_X_y=True)
e = ("No dataset provided")
print(e)
raise Exception(e)

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=0)
Expand All @@ -108,21 +109,18 @@ def main():

reg = train_model(run, data, alpha)

joblib.dump(value=reg, filename=model_name)

# upload model file explicitly into artifacts for parent run
run.parent.upload_file(name="./outputs/" + model_name,
path_or_stream=model_name)
print("Uploaded the model {} to experiment {}".format(
model_name, run.experiment.name))
dirpath = os.getcwd()
print(dirpath)
print("Following files are uploaded ")
print(run.parent.get_file_names())
# Pass model file to next step
os.makedirs(step_output_path, exist_ok=True)
model_output_path = os.path.join(step_output_path, model_name)
joblib.dump(value=reg, filename=model_output_path)

run.parent.tag("BuildId", value=build_id)
# Also upload model file to run outputs for history
os.makedirs('outputs', exist_ok=True)
output_path = os.path.join('outputs', model_name)
joblib.dump(value=reg, filename=output_path)

# Add properties to identify this specific training run
run.parent.tag("BuildId", value=build_id)
run.tag("BuildId", value=build_id)
run.tag("run_type", value="train")
builduri_base = os.environ.get("BUILDURI_BASE")
Expand Down
7 changes: 5 additions & 2 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ the BASE_NAME value should not exceed 10 characters and it should contain number

The **RESOURCE_GROUP** parameter is used as the name for the resource group that will hold the Azure resources for the solution. If providing an existing AML Workspace, set this value to the corresponding resource group name.

The **WORKSPACE_SVC_CONNECTION** parameter is used to reference a service connection for the Azure ML workspace. You will create this after provisioning the workspace (we recommend using the IaC pipeline as described below), and installing the Azure ML extension in your Azure DevOps project.

Optionally, a **DATASET_NAME** parameter can be used to reference a training dataset that you have registered in your Azure ML workspace (more details below).

Make sure to select the **Allow access to all pipelines** checkbox in the
variable group configuration.

Expand Down Expand Up @@ -125,8 +129,7 @@ Check out the newly created resources in the [Azure Portal](https://portal.azure

(Optional) To remove the resources created for this project you can use the [/environment_setup/iac-remove-environment.yml](../environment_setup/iac-remove-environment.yml) definition or you can just delete the resource group in the [Azure Portal](https://portal.azure.com).

**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. If you want to use your own dataset, you need to [create and register a datastore](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-access-data#azure-machine-learning-studio) in your ML workspace and upload the datafile (e.g. [diabetes.csv](./data/diabetes.csv)) to the corresponding blob container. You can also define a datastore in the ML Workspace with [az cli](https://docs.microsoft.com/en-us/cli/azure/ext/azure-cli-ml/ml/datastore?view=azure-cli-latest#ext-azure-cli-ml-az-ml-datastore-attach-blob).
You'll also need to configure DATASTORE_NAME and DATAFILE_NAME variables in ***devopsforai-aml-vg*** variable group.
**Note:** The training ML pipeline uses a [sample diabetes dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_diabetes.html) as training data. To use your own data, you need to [create a Dataset](https://docs.microsoft.com/azure/machine-learning/how-to-create-register-datasets) in your workspace and specify its name in a DATASET_NAME variable in the ***devopsforai-aml-vg*** variable group. You will also need to modify the test cases in the **ml_service/util/smoke_test_scoring_service.py** script to match the schema of the training features in your dataset.

## Create an Azure DevOps Azure ML Workspace Service Connection

Expand Down
64 changes: 51 additions & 13 deletions ml_service/pipelines/diabetes_regression_build_train_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from azureml.pipeline.core.graph import PipelineParameter
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.core import Workspace, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.core import Dataset, Datastore
from azureml.core import Dataset
from ml_service.util.attach_compute import get_compute
from ml_service.util.env_variables import Env
from sklearn.datasets import load_diabetes
import pandas as pd
import os


def main():
Expand Down Expand Up @@ -45,26 +48,59 @@ def main():
build_id_param = PipelineParameter(
name="build_id", default_value=e.build_id)

dataset_name = ""
if (e.datastore_name is not None and e.datafile_name is not None):
dataset_name = e.dataset_name
datastore = Datastore.get(aml_workspace, e.datastore_name)
data_path = [(datastore, e.datafile_name)]
dataset = Dataset.Tabular.from_delimited_files(path=data_path)
dataset.register(workspace=aml_workspace,
name=e.dataset_name,
description="dataset with training data",
create_new_version=True)
# Get dataset name
dataset_name = e.dataset_name

# Check to see if dataset exists
if (dataset_name not in aml_workspace.datasets):
# Create dataset from diabetes sample data
sample_data = load_diabetes()
df = pd.DataFrame(
data=sample_data.data,
columns=sample_data.feature_names)
df['Y'] = sample_data.target
file_name = 'diabetes.csv'
df.to_csv(file_name, index=False)

# Upload file to default datastore in workspace
default_ds = aml_workspace.get_default_datastore()
target_path = 'training-data/'
default_ds.upload_files(
files=[file_name],
target_path=target_path,
overwrite=True,
show_progress=False)

# Register dataset
path_on_datastore = os.path.join(target_path, file_name)
dataset = Dataset.Tabular.from_delimited_files(
path=(default_ds, path_on_datastore))
dataset = dataset.register(
workspace=aml_workspace,
name=dataset_name,
description='diabetes training data',
tags={'format': 'CSV'},
create_new_version=True)

# Get the dataset
dataset = Dataset.get_by_name(aml_workspace, dataset_name)

# Create a PipelineData to pass data between steps
pipeline_data = PipelineData(
'pipeline_data',
datastore=aml_workspace.get_default_datastore())

train_step = PythonScriptStep(
name="Train Model",
script_name=e.train_script_path,
compute_target=aml_compute,
source_directory=e.sources_directory_train,
inputs=[dataset.as_named_input('training_data')],
outputs=[pipeline_data],
arguments=[
"--build_id", build_id_param,
"--model_name", model_name_param,
"--dataset_name", dataset_name,
"--step_output", pipeline_data
],
runconfig=run_config,
allow_reuse=False,
Expand All @@ -91,9 +127,11 @@ def main():
script_name=e.register_script_path,
compute_target=aml_compute,
source_directory=e.sources_directory_train,
inputs=[pipeline_data],
arguments=[
"--build_id", build_id_param,
"--model_name", model_name_param,
"--step_input", pipeline_data,
],
runconfig=run_config,
allow_reuse=False,
Expand Down
Loading