diff --git a/program/metaflow/helloworld.py b/program/metaflow/helloworld.py index acf0a71..55d8281 100644 --- a/program/metaflow/helloworld.py +++ b/program/metaflow/helloworld.py @@ -235,7 +235,7 @@ def tune_model(self): validation_data=(self.X_validation, self.y_validation), batch_size=32, epochs=50, - verbose=, + verbose=2, ) tuner.results_summary() diff --git a/program/metaflow/training.py b/program/metaflow/training.py index c5a3419..8c7fca5 100644 --- a/program/metaflow/training.py +++ b/program/metaflow/training.py @@ -23,6 +23,43 @@ def load_data_from_file(): return pd.read_csv(location) +def build_features_transformer(): + from sklearn.compose import ColumnTransformer, make_column_selector + from sklearn.impute import SimpleImputer + from sklearn.pipeline import make_pipeline + from sklearn.preprocessing import OneHotEncoder, StandardScaler + + numeric_transformer = make_pipeline( + SimpleImputer(strategy="mean"), + StandardScaler(), + ) + + categorical_transformer = make_pipeline( + SimpleImputer(strategy="most_frequent"), + OneHotEncoder(), + ) + + return ColumnTransformer( + transformers=[ + ( + "numeric", + numeric_transformer, + make_column_selector(dtype_exclude="object"), + ), + ("categorical", categorical_transformer, ["island"]), + ], + ) + + +def build_target_transformer(): + from sklearn.compose import ColumnTransformer + from sklearn.preprocessing import OrdinalEncoder + + return ColumnTransformer( + transformers=[("species", OrdinalEncoder(), [0])], + ) + + def build_model(nodes, learning_rate): from keras import Input from keras.layers import Dense @@ -68,6 +105,9 @@ def build_tuner_model(hp): "scikit-learn": "1.4.1.post1", "pandas": "2.2.1", "numpy": "1.26.4", + "keras": "3.3.0", + "jax[cpu]": "0.4.26", + "packaging": "24.0", }, ) class TrainingFlow(FlowSpec): @@ -85,8 +125,6 @@ class TrainingFlow(FlowSpec): @step def start(self): - self.my_var = "hello world" - self.next(self.load_data) @pypi(packages={"boto3": "1.34.70"}) @@ -111,101 +149,72 @@ def load_data(self): print(f"Loaded dataset with {len(self.data)} samples") - self.next(self.setup_target_transformer) + self.next(self.prepare_dataset) @step - def setup_target_transformer(self): - from sklearn.compose import ColumnTransformer - from sklearn.preprocessing import OrdinalEncoder + def prepare_dataset(self): + import numpy as np - self.target_transformer = ColumnTransformer( - transformers=[("species", OrdinalEncoder(), [0])], - ) + self.target = np.array(self.data["species"]).reshape(-1, 1) + self.features = self.data.drop("species", axis=1) - self.next(self.setup_features_transformer) + self.next(self.transform_target, self.cross_validation) @step - def setup_features_transformer(self): - from sklearn.compose import ColumnTransformer, make_column_selector - from sklearn.impute import SimpleImputer - from sklearn.pipeline import make_pipeline - from sklearn.preprocessing import OneHotEncoder, StandardScaler - - numeric_transformer = make_pipeline( - SimpleImputer(strategy="mean"), - StandardScaler(), - ) + def transform_target(self): + target_transformer = build_target_transformer() + self.y = target_transformer.fit_transform(self.target) - categorical_transformer = make_pipeline( - SimpleImputer(strategy="most_frequent"), - OneHotEncoder(), - ) + self.next(self.transform_features) - self.features_transformer = ColumnTransformer( - transformers=[ - ( - "numeric", - numeric_transformer, - make_column_selector(dtype_exclude="object"), - ), - ("categorical", categorical_transformer, ["island"]), - ], - ) + @step + def transform_features(self): + features_transformer = build_features_transformer() + self.x = features_transformer.fit_transform(self.features) - self.next(self.split_dataset) + self.next(self.train_model) @step - def split_dataset(self): - """Split the data into train, validation, and test.""" + def cross_validation(self): from sklearn.model_selection import KFold - self.target = self.data["species"] - self.features = self.data.drop("species", axis=1) - kfold = KFold(n_splits=5, shuffle=True) self.folds = list(enumerate(kfold.split(self.target, self.features))) - self.next(self.transform_target, foreach="folds") + self.next(self.transform_target_fold, foreach="folds") @step - def transform_target(self): - import numpy as np - + def transform_target_fold(self): + """Apply the transformation pipeline to the target feature.""" self.fold, (self.train_indices, self.test_indices) = self.input - self.y_train = np.squeeze( - self.target_transformer.fit_transform( - np.array(self.target.iloc[self.train_indices]).reshape(-1, 1), - ), + target_transformer = build_target_transformer() + self.y_train = target_transformer.fit_transform( + self.target[self.train_indices], ) - self.y_test = np.squeeze( - self.target_transformer.transform( - np.array(self.target.iloc[self.test_indices]).reshape(-1, 1), - ), + + self.y_test = target_transformer.transform( + self.target[self.test_indices], ) - self.next(self.transform_features) + self.next(self.transform_features_fold) @step - def transform_features(self): - self.x_train = self.features_transformer.fit_transform( + def transform_features_fold(self): + """Apply the transformation pipeline to the dataset features.""" + features_transformer = build_features_transformer() + self.x_train = features_transformer.fit_transform( self.features.iloc[self.train_indices], ) - self.x_test = self.features_transformer.transform( + self.x_test = features_transformer.transform( self.features.iloc[self.test_indices], ) - self.next(self.train_model) + self.next(self.train_model_fold) - @pypi( - packages={ - "keras": "3.3.0", - "jax[cpu]": "0.4.26", - "packaging": "24.0", - }, - ) @step - def train_model(self): + def train_model_fold(self): + """Train a model as part of the cross-validation process.""" print(f"Training fold {self.fold}...") self.model = build_model(10, 0.01) @@ -218,17 +227,11 @@ def train_model(self): verbose=2, ) - self.next(self.evaluate_model) + self.next(self.evaluate_model_fold) - @pypi( - packages={ - "keras": "3.3.0", - "jax[cpu]": "0.4.26", - "packaging": "24.0", - }, - ) @step - def evaluate_model(self): + def evaluate_model_fold(self): + """Evaluate a model created as part of the cross-validation process.""" print(f"Evaluating fold {self.fold}...") self.loss, self.accuracy = self.model.evaluate( @@ -238,10 +241,10 @@ def evaluate_model(self): ) print(f"Fold {self.fold} - loss: {self.loss} - accuracy: {self.accuracy}") - self.next(self.reduce) + self.next(self.evaluate_model) @step - def reduce(self, inputs): + def evaluate_model(self, inputs): import numpy as np accuracies = [i.accuracy for i in inputs] @@ -250,6 +253,26 @@ def reduce(self, inputs): print(f"Accuracy: {accuracy} +-{accuracy_std}") + self.next(self.train_model) + + @step + def train_model(self, inputs): + """Train the final model that will be deployed to production. + + This function will use the entire dataset to train the model. + """ + self.merge_artifacts(inputs) + + self.model = build_model(10, 0.01) + + self.model.fit( + self.x, + self.y, + epochs=50, + batch_size=32, + verbose=2, + ) + self.next(self.end) @step