Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export PIPELINE_VERSION=1.8.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"

#verify this with port forwarding kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# change `--n` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80


# Step 2: the following code will create a kfp.Client() against your port-forwarded ml-pipeline-ui service:

# import kfp

# client = kfp.Client(host="http://localhost:3000")

# print(client.list_experiments())
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import List

from kfp import Client
import kfp.dsl
from kfp.v2 import dsl
from kfp.v2.dsl import Dataset
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model
from kfp.v2.dsl import Output


@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
import pandas as pd

csv_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
col_names = [
"Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"
]
df = pd.read_csv(csv_url)
df.columns = col_names

with open(iris_dataset.path, 'w') as f:
df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def normalize_dataset(
input_iris_dataset: Input[Dataset],
normalized_iris_dataset: Output[Dataset],
standard_scaler: bool,
min_max_scaler: bool,
):
if standard_scaler is min_max_scaler:
raise ValueError(
'Exactly one of standard_scaler or min_max_scaler must be True.')

import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler

with open(input_iris_dataset.path) as f:
df = pd.read_csv(f)
labels = df.pop('Labels')

if standard_scaler:
scaler = StandardScaler()
if min_max_scaler:
scaler = MinMaxScaler()

df = pd.DataFrame(scaler.fit_transform(df))
df['Labels'] = labels
with open(normalized_iris_dataset.path, 'w') as f:
df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
def train_model(
normalized_iris_dataset: Input[Dataset],
model: Output[Model],
n_neighbors: int,
):
import pickle

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

with open(normalized_iris_dataset.path) as f:
df = pd.read_csv(f)

y = df.pop('Labels')
X = df

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

clf = KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(X_train, y_train)
with open(model.path, 'wb') as f:
pickle.dump(clf, f)


@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
standard_scaler: bool,
min_max_scaler: bool,
neighbors: List[int],
):
create_dataset_task = create_dataset()

normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
standard_scaler=True,
min_max_scaler=False)

with dsl.ParallelFor(neighbors) as n_neighbors:
train_model(
normalized_iris_dataset=normalize_dataset_task
.outputs['normalized_iris_dataset'],
n_neighbors=n_neighbors)


endpoint = 'http://localhost:8080' #as a result of port-forwarding.
# got this from running kubectl cluster-info --context kind-mlewp (this is cluster name)
#endpoint = 'https://127.0.0.1:50663'
kfp_client = Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
my_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
arguments={
'min_max_scaler': True,
'standard_scaler': False,
'neighbors': [3, 6, 9]
},
)
url = f'{endpoint}/#/runs/details/{run.run_id}'
print(url)
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from typing import List

from kfp import Client
import kfp.dsl
from kfp.v2 import dsl
from kfp.v2.dsl import Dataset
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model
from kfp.v2.dsl import Output


@dsl.component(packages_to_install=['pandas==1.3.5'])
def create_dataset(iris_dataset: Output[Dataset]):
import pandas as pd

csv_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
col_names = [
"Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"
]
df = pd.read_csv(csv_url)
df.columns = col_names

with open(iris_dataset.path, 'w') as f:
df.to_csv(f)


# @dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
# def normalize_dataset(
# input_iris_dataset: Input[Dataset],
# normalized_iris_dataset: Output[Dataset],
# standard_scaler: bool,
# min_max_scaler: bool,
# ):
# if standard_scaler is min_max_scaler:
# raise ValueError(
# 'Exactly one of standard_scaler or min_max_scaler must be True.')

# import pandas as pd
# from sklearn.preprocessing import MinMaxScaler
# from sklearn.preprocessing import StandardScaler

# with open(input_iris_dataset.path) as f:
# df = pd.read_csv(f)
# labels = df.pop('Labels')

# if standard_scaler:
# scaler = StandardScaler()
# if min_max_scaler:
# scaler = MinMaxScaler()

# df = pd.DataFrame(scaler.fit_transform(df))
# df['Labels'] = labels
# with open(normalized_iris_dataset.path, 'w') as f:
# df.to_csv(f)


# @dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2'])
# def train_model(
# normalized_iris_dataset: Input[Dataset],
# model: Output[Model],
# n_neighbors: int,
# ):
# import pickle

# import pandas as pd
# from sklearn.model_selection import train_test_split
# from sklearn.neighbors import KNeighborsClassifier

# with open(normalized_iris_dataset.path) as f:
# df = pd.read_csv(f)

# y = df.pop('Labels')
# X = df

# X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

# clf = KNeighborsClassifier(n_neighbors=n_neighbors)
# clf.fit(X_train, y_train)
# with open(model.path, 'wb') as f:
# pickle.dump(clf, f)


# @dsl.pipeline(name='iris-training-pipeline')
# def my_pipeline(
# standard_scaler: bool,
# min_max_scaler: bool,
# neighbors: List[int],
# ):
# create_dataset_task = create_dataset()

# normalize_dataset_task = normalize_dataset(
# input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
# standard_scaler=True,
# min_max_scaler=False)

# with dsl.ParallelFor(neighbors) as n_neighbors:
# train_model(
# normalized_iris_dataset=normalize_dataset_task
# .outputs['normalized_iris_dataset'],
# n_neighbors=n_neighbors)

@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline():
create_dataset_task = create_dataset()

endpoint = 'http://localhost:8080' #as a result of port-forwarding.
# got this from running kubectl cluster-info --context kind-mlewp (this is cluster name)
#endpoint = 'https://127.0.0.1:50663'
kfp_client = Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(ß
my_pipeline,
mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
)ßßß
print(url)
Loading