Skip to content

Commit

Permalink
feat(cli): run pipeline from CLI directly
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Coufal <tcoufal@redhat.com>
  • Loading branch information
tumido committed Dec 2, 2024
1 parent 21a27bf commit 8b77716
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 16 deletions.
65 changes: 53 additions & 12 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@
STANDALONE_TEMPLATE_FILE_NAME = "standalone.tpl"
GENERATED_STANDALONE_FILE_NAME = "standalone.py"
DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git"
KFP_MODEL_SERVER_CM = "sdg/kfp-model-server.yaml"

# FIXME: This value is specific to ocp-beta-test.nerc.mghpcc.org cluster, `ilab` namespace. It is quite cumbersome to copypaste and remember the path every time in dev. This default value should go away once we reach feature freeze.
BASE_MODEL = "s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter"

# eval args
MMLU_TASKS_LIST = "mmlu_anatomy,mmlu_astronomy"
FEW_SHOTS = 5
# BATCH_SIZE can also be an int, for example "8" is converted to an int in eval/final
BATCH_SIZE = "auto"
Expand Down Expand Up @@ -113,8 +108,10 @@ def pipeline(
# SDG phase
sdg_repo_url: str = "https://github.com/instructlab/taxonomy.git",
sdg_repo_branch: Optional[str] = None,
sdg_repo_pr: Optional[int] = None,
sdg_base_model: str = BASE_MODEL,
sdg_repo_pr: Optional[
int
] = None, # FIXME: https://issues.redhat.com/browse/RHOAIRFE-467
sdg_base_model: str = "s3://<BUCKET>/<PATH_TO_MODEL>",
sdg_scale_factor: int = 2, # Renamed upstream https://github.com/instructlab/instructlab/blob/f7d40f6ed5112d59132dd832bd332fa6fbbe7010/src/instructlab/configuration.py#L279-L290
sdg_pipeline: str = SDG_PIPELINE,
sdg_max_batch_len: int = MAX_BATCH_LEN,
Expand Down Expand Up @@ -142,7 +139,7 @@ def pipeline(
final_eval_batch_size: str = BATCH_SIZE,
final_eval_merge_system_user_message: bool = MERGE_SYSTEM_USER_MESSAGE,
# Other options
k8s_storage_class_name: str = "nfs-csi",
k8s_storage_class_name: str = "standard", # FIXME: https://github.com/kubeflow/pipelines/issues/11396, https://issues.redhat.com/browse/RHOAIRFE-470
):
"""InstructLab pipeline
Expand Down Expand Up @@ -247,10 +244,6 @@ def pipeline(
artifact_uri=sdg_base_model, artifact_class=dsl.Model
)

# We need to pass storage_class_name as "" to use the default StorageClass, if left empty, KFP uses "standard" StorageClass.
# 'standard' != default StorageClass
# https://github.com/kubeflow/pipelines/blob/1cded35cf5e93d8c8d32fefbddceb2eed8de9a0a/backend/src/v2/driver/driver.go#L1428-L1436
# At least we made it a pipeline parameter
model_pvc_task = CreatePVC(
pvc_name_suffix="-model-cache",
access_modes=["ReadWriteMany"],
Expand Down Expand Up @@ -535,6 +528,54 @@ def generate_pipeline(mock):
compiler.Compiler().compile(pipeline_func, pipeline_file)


@cli.command(name="run")
@click.option(
"--mock",
type=click.Choice(MOCKED_STAGES, case_sensitive=False),
help="Mock part of the pipeline",
multiple=True,
default=[],
)
@click.option("-e", "--experiment", help="Set KFP experiment name.")
@click.option("-r", "--run", "run_name", help="Set KFP run name.")
@click.option(
"-p",
"--param",
help="Override default parameters in KEY=VALUE format. Default parameters are suitable for dev cluster - the MOC cluster, `ilab` namespace.",
multiple=True,
)
def run(mock, experiment, run_name, param):
"""
Run the pipeline immediately against current kubernetes context (cluster and namespace).
Command sets expected dev-cluster friendly default values when submitting.
"""
from utils.kfp_client import get_kfp_client

client = get_kfp_client()

dev_arguments = {
"k8s_storage_class_name": "nfs-csi",
"sdg_base_model": "s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter",
"train_nproc_per_node": 2,
}

try:
parsed_params = dict(item.split("=") for item in param)
except ValueError as e:
raise click.BadOptionUsage(
"param", "Parameters are required to be passed in KEY=VALUE format"
) from e

arguments = {**dev_arguments, **parsed_params}
client.create_run_from_pipeline_func(
pipeline_func=ilab_pipeline_wrapper(mock),
experiment_name=experiment,
run_name=run_name,
arguments=arguments,
)


@cli.command(name="gen-standalone")
def gen_standalone():
"""
Expand Down
8 changes: 4 additions & 4 deletions pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# final_eval_few_shots: int [Default: 5.0]
# final_eval_max_workers: str [Default: 'auto']
# final_eval_merge_system_user_message: bool [Default: False]
# k8s_storage_class_name: str [Default: 'nfs-csi']
# k8s_storage_class_name: str [Default: 'standard']
# mt_bench_max_workers: str [Default: 'auto']
# mt_bench_merge_system_user_message: bool [Default: False]
# sdg_base_model: str [Default: 's3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter']
# sdg_base_model: str [Default: 's3://<BUCKET>/<PATH_TO_MODEL>']
# sdg_max_batch_len: int [Default: 20000.0]
# sdg_pipeline: str [Default: 'simple']
# sdg_repo_branch: str
Expand Down Expand Up @@ -2042,7 +2042,7 @@ root:
isOptional: true
parameterType: BOOLEAN
k8s_storage_class_name:
defaultValue: nfs-csi
defaultValue: standard
description: A Kubernetes StorageClass name for persistent volumes. Selected
StorageClass must support RWX PersistentVolumes.
isOptional: true
Expand All @@ -2060,7 +2060,7 @@ root:
isOptional: true
parameterType: BOOLEAN
sdg_base_model:
defaultValue: s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter
defaultValue: s3://<BUCKET>/<PATH_TO_MODEL>
description: SDG parameter. LLM model used to generate the synthetic dataset
isOptional: true
parameterType: STRING
Expand Down
56 changes: 56 additions & 0 deletions utils/kfp_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# type: ignore
import warnings

from kfp import Client
from kubernetes.client import CustomObjectsApi
from kubernetes.client.configuration import Configuration
from kubernetes.client.exceptions import ApiException
from kubernetes.config import list_kube_config_contexts
from kubernetes.config.config_exception import ConfigException
from kubernetes.config.kube_config import load_kube_config


def get_kfp_client():
config = Configuration()
try:
load_kube_config(client_configuration=config)
token = config.api_key["authorization"].split(" ")[-1]
except (KeyError, ConfigException) as e:
raise ApiException(
401, "Unauthorized, try running `oc login` command first"
) from e
Configuration.set_default(config)

_, active_context = list_kube_config_contexts()
namespace = active_context["context"]["namespace"]

dspas = CustomObjectsApi().list_namespaced_custom_object(
"datasciencepipelinesapplications.opendatahub.io",
"v1alpha1",
namespace,
"datasciencepipelinesapplications",
)

try:
dspa = dspas["items"][0]
except IndexError as e:
raise ApiException(404, "DataSciencePipelines resource not found") from e

try:
if dspa["spec"]["dspVersion"] != "v2":
raise KeyError
except KeyError as e:
raise EnvironmentError(
"Installed version of Kubeflow Pipelines does not meet minimal version criteria. Use KFPv2 please."
) from e

try:
host = dspa["status"]["components"]["apiServer"]["externalUrl"]
except KeyError as e:
raise ApiException(
409,
"DataSciencePipelines resource is not ready. Check for .status.components.apiServer",
) from e

with warnings.catch_warnings(action="ignore"):
return Client(existing_token=token, host=host)

0 comments on commit 8b77716

Please sign in to comment.