From 8b77716edd7eebc6824cbf578a58dd2566fde91f Mon Sep 17 00:00:00 2001 From: Tomas Coufal Date: Mon, 2 Dec 2024 13:30:17 +0100 Subject: [PATCH] feat(cli): run pipeline from CLI directly Signed-off-by: Tomas Coufal --- pipeline.py | 65 ++++++++++++++++++++++++++++++++++++--------- pipeline.yaml | 8 +++--- utils/kfp_client.py | 56 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 utils/kfp_client.py diff --git a/pipeline.py b/pipeline.py index 0f29adc8..5269088b 100644 --- a/pipeline.py +++ b/pipeline.py @@ -28,13 +28,8 @@ STANDALONE_TEMPLATE_FILE_NAME = "standalone.tpl" GENERATED_STANDALONE_FILE_NAME = "standalone.py" DEFAULT_REPO_URL = "https://github.com/instructlab/taxonomy.git" -KFP_MODEL_SERVER_CM = "sdg/kfp-model-server.yaml" - -# FIXME: This value is specific to ocp-beta-test.nerc.mghpcc.org cluster, `ilab` namespace. It is quite cumbersome to copypaste and remember the path every time in dev. This default value should go away once we reach feature freeze. -BASE_MODEL = "s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter" # eval args -MMLU_TASKS_LIST = "mmlu_anatomy,mmlu_astronomy" FEW_SHOTS = 5 # BATCH_SIZE can also be an int, for example "8" is converted to an int in eval/final BATCH_SIZE = "auto" @@ -113,8 +108,10 @@ def pipeline( # SDG phase sdg_repo_url: str = "https://github.com/instructlab/taxonomy.git", sdg_repo_branch: Optional[str] = None, - sdg_repo_pr: Optional[int] = None, - sdg_base_model: str = BASE_MODEL, + sdg_repo_pr: Optional[ + int + ] = None, # FIXME: https://issues.redhat.com/browse/RHOAIRFE-467 + sdg_base_model: str = "s3:///", sdg_scale_factor: int = 2, # Renamed upstream https://github.com/instructlab/instructlab/blob/f7d40f6ed5112d59132dd832bd332fa6fbbe7010/src/instructlab/configuration.py#L279-L290 sdg_pipeline: str = SDG_PIPELINE, sdg_max_batch_len: int = MAX_BATCH_LEN, @@ -142,7 +139,7 @@ def pipeline( final_eval_batch_size: str = BATCH_SIZE, final_eval_merge_system_user_message: bool = MERGE_SYSTEM_USER_MESSAGE, # Other options - k8s_storage_class_name: str = "nfs-csi", + k8s_storage_class_name: str = "standard", # FIXME: https://github.com/kubeflow/pipelines/issues/11396, https://issues.redhat.com/browse/RHOAIRFE-470 ): """InstructLab pipeline @@ -247,10 +244,6 @@ def pipeline( artifact_uri=sdg_base_model, artifact_class=dsl.Model ) - # We need to pass storage_class_name as "" to use the default StorageClass, if left empty, KFP uses "standard" StorageClass. - # 'standard' != default StorageClass - # https://github.com/kubeflow/pipelines/blob/1cded35cf5e93d8c8d32fefbddceb2eed8de9a0a/backend/src/v2/driver/driver.go#L1428-L1436 - # At least we made it a pipeline parameter model_pvc_task = CreatePVC( pvc_name_suffix="-model-cache", access_modes=["ReadWriteMany"], @@ -535,6 +528,54 @@ def generate_pipeline(mock): compiler.Compiler().compile(pipeline_func, pipeline_file) +@cli.command(name="run") +@click.option( + "--mock", + type=click.Choice(MOCKED_STAGES, case_sensitive=False), + help="Mock part of the pipeline", + multiple=True, + default=[], +) +@click.option("-e", "--experiment", help="Set KFP experiment name.") +@click.option("-r", "--run", "run_name", help="Set KFP run name.") +@click.option( + "-p", + "--param", + help="Override default parameters in KEY=VALUE format. Default parameters are suitable for dev cluster - the MOC cluster, `ilab` namespace.", + multiple=True, +) +def run(mock, experiment, run_name, param): + """ + Run the pipeline immediately against current kubernetes context (cluster and namespace). + + Command sets expected dev-cluster friendly default values when submitting. + """ + from utils.kfp_client import get_kfp_client + + client = get_kfp_client() + + dev_arguments = { + "k8s_storage_class_name": "nfs-csi", + "sdg_base_model": "s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter", + "train_nproc_per_node": 2, + } + + try: + parsed_params = dict(item.split("=") for item in param) + except ValueError as e: + raise click.BadOptionUsage( + "param", "Parameters are required to be passed in KEY=VALUE format" + ) from e + + arguments = {**dev_arguments, **parsed_params} + client.create_run_from_pipeline_func( + pipeline_func=ilab_pipeline_wrapper(mock), + experiment_name=experiment, + run_name=run_name, + arguments=arguments, + ) + + @cli.command(name="gen-standalone") def gen_standalone(): """ diff --git a/pipeline.yaml b/pipeline.yaml index 0dbec02a..23b8999c 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -6,10 +6,10 @@ # final_eval_few_shots: int [Default: 5.0] # final_eval_max_workers: str [Default: 'auto'] # final_eval_merge_system_user_message: bool [Default: False] -# k8s_storage_class_name: str [Default: 'nfs-csi'] +# k8s_storage_class_name: str [Default: 'standard'] # mt_bench_max_workers: str [Default: 'auto'] # mt_bench_merge_system_user_message: bool [Default: False] -# sdg_base_model: str [Default: 's3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter'] +# sdg_base_model: str [Default: 's3:///'] # sdg_max_batch_len: int [Default: 20000.0] # sdg_pipeline: str [Default: 'simple'] # sdg_repo_branch: str @@ -2042,7 +2042,7 @@ root: isOptional: true parameterType: BOOLEAN k8s_storage_class_name: - defaultValue: nfs-csi + defaultValue: standard description: A Kubernetes StorageClass name for persistent volumes. Selected StorageClass must support RWX PersistentVolumes. isOptional: true @@ -2060,7 +2060,7 @@ root: isOptional: true parameterType: BOOLEAN sdg_base_model: - defaultValue: s3://ilab-pipeline-b1d4c2b1-ab00-4e7f-b985-697bda3df385/instructlab-base-importer/648f36d0-e3f0-43b8-8adb-530576beb675/ilab-importer-op/model/granite-7b-starter + defaultValue: s3:/// description: SDG parameter. LLM model used to generate the synthetic dataset isOptional: true parameterType: STRING diff --git a/utils/kfp_client.py b/utils/kfp_client.py new file mode 100644 index 00000000..d43435ab --- /dev/null +++ b/utils/kfp_client.py @@ -0,0 +1,56 @@ +# type: ignore +import warnings + +from kfp import Client +from kubernetes.client import CustomObjectsApi +from kubernetes.client.configuration import Configuration +from kubernetes.client.exceptions import ApiException +from kubernetes.config import list_kube_config_contexts +from kubernetes.config.config_exception import ConfigException +from kubernetes.config.kube_config import load_kube_config + + +def get_kfp_client(): + config = Configuration() + try: + load_kube_config(client_configuration=config) + token = config.api_key["authorization"].split(" ")[-1] + except (KeyError, ConfigException) as e: + raise ApiException( + 401, "Unauthorized, try running `oc login` command first" + ) from e + Configuration.set_default(config) + + _, active_context = list_kube_config_contexts() + namespace = active_context["context"]["namespace"] + + dspas = CustomObjectsApi().list_namespaced_custom_object( + "datasciencepipelinesapplications.opendatahub.io", + "v1alpha1", + namespace, + "datasciencepipelinesapplications", + ) + + try: + dspa = dspas["items"][0] + except IndexError as e: + raise ApiException(404, "DataSciencePipelines resource not found") from e + + try: + if dspa["spec"]["dspVersion"] != "v2": + raise KeyError + except KeyError as e: + raise EnvironmentError( + "Installed version of Kubeflow Pipelines does not meet minimal version criteria. Use KFPv2 please." + ) from e + + try: + host = dspa["status"]["components"]["apiServer"]["externalUrl"] + except KeyError as e: + raise ApiException( + 409, + "DataSciencePipelines resource is not ready. Check for .status.components.apiServer", + ) from e + + with warnings.catch_warnings(action="ignore"): + return Client(existing_token=token, host=host)