From 207ddfe424d3485da00cf28bf397e116d9bbb946 Mon Sep 17 00:00:00 2001 From: Daniela Plascencia Date: Mon, 20 Nov 2023 14:40:56 +0100 Subject: [PATCH] tests: enable bundle functional tests for v2 compiled pipelines (#383) * tests: enable bundle functional tests for v2 compiled pipelines The recent upgrades in kubeflow pipelines introduced a new version of the SDK that is compatible with the newer backend (pipelines 2.0.3). This commits ensures that the CI tests both a pipeline v1 and v2 to ensure good coverage. --- .github/workflows/integrate.yaml | 8 +- requirements-integration.in | 2 - requirements-integration.txt | 108 +------- tests/README.md | 5 +- tests/integration/conftest.py | 34 +-- tests/integration/kfp_globals.py | 9 +- .../pipelines/pipeline_container_no_input.py | 38 +++ .../pipeline_container_no_input.yaml | 29 +++ ...unctional.py => test_kfp_functional_v1.py} | 96 ++++---- tests/integration/test_kfp_functional_v2.py | 232 ++++++++++++++++++ tox.ini | 13 +- 11 files changed, 387 insertions(+), 187 deletions(-) create mode 100644 tests/integration/pipelines/pipeline_container_no_input.py create mode 100644 tests/integration/pipelines/pipeline_container_no_input.yaml rename tests/integration/{test_kfp_functional.py => test_kfp_functional_v1.py} (73%) create mode 100644 tests/integration/test_kfp_functional_v2.py diff --git a/.github/workflows/integrate.yaml b/.github/workflows/integrate.yaml index 508148f5..66566cde 100644 --- a/.github/workflows/integrate.yaml +++ b/.github/workflows/integrate.yaml @@ -109,7 +109,11 @@ jobs: test-bundle: name: Test the bundle runs-on: ubuntu-20.04 - + strategy: + matrix: + sdk: + - v1 + - v2 steps: # This is a workaround for https://github.com/canonical/kfp-operators/issues/250 # Ideally we'd use self-hosted runners, but this effort is still not stable @@ -145,7 +149,7 @@ jobs: # Run integration tests against the 1.7 generic install bundle definition # Using destructive mode because of https://github.com/canonical/charmcraft/issues/1132 # and https://github.com/canonical/charmcraft/issues/1138 - sg snap_microk8s -c "tox -e bundle-integration -- --model kubeflow --bundle=./tests/integration/bundles/kfp_latest_edge.yaml.j2 --destructive-mode" + sg snap_microk8s -c "tox -e bundle-integration-${{ matrix.sdk }} -- --model kubeflow --bundle=./tests/integration/bundles/kfp_latest_edge.yaml.j2 --destructive-mode" - name: Get all run: kubectl get all -A diff --git a/requirements-integration.in b/requirements-integration.in index 2e8d9484..0da56c83 100644 --- a/requirements-integration.in +++ b/requirements-integration.in @@ -4,8 +4,6 @@ aiohttp jsonschema<4.18 # Pinning to <4.0 due to compatibility with the 3.1 controller version juju<4.0 -# Need this version of kfp to work with kfp 2.0.0-alpha.7 -kfp==1.8.22 lightkube pytest pytest-operator diff --git a/requirements-integration.txt b/requirements-integration.txt index fcf50256..4ea9d935 100644 --- a/requirements-integration.txt +++ b/requirements-integration.txt @@ -4,8 +4,6 @@ # # pip-compile requirements-integration.in # -absl-py==1.4.0 - # via kfp aiohttp==3.8.5 # via -r requirements-integration.in aiosignal==1.3.1 @@ -30,7 +28,6 @@ certifi==2023.7.22 # via # httpcore # httpx - # kfp-server-api # kubernetes # requests cffi==1.15.1 @@ -41,71 +38,28 @@ charset-normalizer==3.2.0 # via # aiohttp # requests -click==8.1.7 - # via - # kfp - # typer -cloudpickle==2.2.1 - # via kfp cryptography==41.0.3 # via paramiko decorator==5.1.1 # via # ipdb # ipython -deprecated==1.2.14 - # via kfp -docstring-parser==0.15 - # via kfp exceptiongroup==1.1.3 # via # anyio # pytest executing==1.2.0 # via stack-data -fire==0.5.0 - # via kfp frozenlist==1.4.0 # via # aiohttp # aiosignal -google-api-core==2.11.1 - # via - # google-api-python-client - # google-cloud-core - # google-cloud-storage - # kfp -google-api-python-client==1.12.11 - # via kfp google-auth==2.22.0 - # via - # google-api-core - # google-api-python-client - # google-auth-httplib2 - # google-cloud-core - # google-cloud-storage - # kfp - # kubernetes -google-auth-httplib2==0.1.0 - # via google-api-python-client -google-cloud-core==2.3.3 - # via google-cloud-storage -google-cloud-storage==2.10.0 - # via kfp -google-crc32c==1.5.0 - # via google-resumable-media -google-resumable-media==2.6.0 - # via google-cloud-storage -googleapis-common-protos==1.60.0 - # via google-api-core + # via kubernetes h11==0.14.0 # via httpcore httpcore==0.17.3 # via httpx -httplib2==0.22.0 - # via - # google-api-python-client - # google-auth-httplib2 httpx==0.24.1 # via lightkube hvac==1.2.0 @@ -129,23 +83,13 @@ jedi==0.19.0 jinja2==3.1.2 # via pytest-operator jsonschema==4.17.3 - # via - # -r requirements-integration.in - # kfp + # via -r requirements-integration.in juju==3.2.2 # via # -r requirements-integration.in # pytest-operator -kfp==1.8.22 - # via -r requirements-integration.in -kfp-pipeline-spec==0.1.16 - # via kfp -kfp-server-api==1.8.5 - # via kfp kubernetes==25.3.0 - # via - # juju - # kfp + # via juju lightkube==0.14.0 # via -r requirements-integration.in lightkube-models==1.28.1.4 @@ -181,12 +125,7 @@ pluggy==1.3.0 prompt-toolkit==3.0.39 # via ipython protobuf==3.20.3 - # via - # google-api-core - # googleapis-common-protos - # kfp - # kfp-pipeline-spec - # macaroonbakery + # via macaroonbakery ptyprocess==0.7.0 # via pexpect pure-eval==0.2.2 @@ -200,8 +139,6 @@ pyasn1-modules==0.3.0 # via google-auth pycparser==2.21 # via cffi -pydantic==1.10.12 - # via kfp pygments==2.16.1 # via ipython pyhcl==0.4.5 @@ -213,8 +150,6 @@ pynacl==1.5.0 # macaroonbakery # paramiko # pymacaroons -pyparsing==3.1.1 - # via httplib2 pyrfc3339==1.1 # via # juju @@ -231,42 +166,30 @@ pytest-asyncio==0.21.1 pytest-operator==0.29.0 # via -r requirements-integration.in python-dateutil==2.8.2 - # via - # kfp-server-api - # kubernetes + # via kubernetes pytz==2023.3.post1 # via pyrfc3339 pyyaml==6.0.1 # via # -r requirements-integration.in # juju - # kfp # kubernetes # lightkube # pytest-operator requests==2.31.0 # via - # google-api-core - # google-cloud-storage # hvac # kubernetes # macaroonbakery # requests-oauthlib - # requests-toolbelt requests-oauthlib==1.3.1 # via kubernetes -requests-toolbelt==0.10.1 - # via kfp rsa==4.9 # via google-auth six==1.16.0 # via # asttokens - # fire - # google-api-python-client # google-auth - # google-auth-httplib2 - # kfp-server-api # kubernetes # macaroonbakery # paramiko @@ -279,14 +202,8 @@ sniffio==1.3.0 # httpx stack-data==0.6.2 # via ipython -strip-hints==0.1.10 - # via kfp -tabulate==0.9.0 - # via kfp tenacity==8.2.3 # via -r requirements-integration.in -termcolor==2.3.0 - # via fire tomli==2.0.1 # via # ipdb @@ -297,26 +214,15 @@ traitlets==5.9.0 # via # ipython # matplotlib-inline -typer==0.9.0 - # via kfp typing-extensions==4.7.1 # via # ipython - # kfp - # pydantic - # typer # typing-inspect typing-inspect==0.9.0 # via juju -uritemplate==3.0.1 - # via - # google-api-python-client - # kfp urllib3==1.26.16 # via # google-auth - # kfp - # kfp-server-api # kubernetes # requests wcwidth==0.2.6 @@ -325,10 +231,6 @@ websocket-client==1.6.2 # via kubernetes websockets==8.1 # via juju -wheel==0.41.2 - # via strip-hints -wrapt==1.15.0 - # via deprecated yarl==1.9.2 # via aiohttp zipp==3.16.2 diff --git a/tests/README.md b/tests/README.md index 110be336..865b66ca 100644 --- a/tests/README.md +++ b/tests/README.md @@ -19,9 +19,8 @@ This directory has the following structure: │   ├── k8s_resources.py │   └── localize_bundle.py ├── kfp_globals.py - ├── pipelines - │   ├── sample_pipeline.yaml - │   └── sample_pipeline_execution_order.py + ├── pipelines/ + │   └── ... # Sample pipelines ├── profile │   └── profile.yaml ├── test_kfp_functional.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c5e498cc..9b983f6b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -30,7 +30,7 @@ KUBEFLOW_USER_NAME = PROFILE_FILE["spec"]["owner"]["name"] -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def forward_kfp_ui(): """Port forward the kfp-ui service.""" kfp_ui_process = subprocess.Popen( @@ -46,7 +46,7 @@ def forward_kfp_ui(): kfp_ui_process.terminate() -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def apply_profile(lightkube_client): """Apply a Profile simulating a user.""" # Create a Viewer namespaced resource @@ -59,7 +59,7 @@ def apply_profile(lightkube_client): yield - # Remove namespace + # Remove profile read_yaml = Path(PROFILE_FILE_PATH).read_text() yaml_loaded = codecs.load_all_yaml(read_yaml) for obj in yaml_loaded: @@ -73,7 +73,7 @@ def apply_profile(lightkube_client): raise api_error -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def kfp_client(apply_profile, forward_kfp_ui) -> kfp.Client: """Returns a KFP Client that can talk to the KFP API Server.""" # Instantiate the KFP Client @@ -82,37 +82,13 @@ def kfp_client(apply_profile, forward_kfp_ui) -> kfp.Client: return client -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def lightkube_client() -> lightkube.Client: """Returns a lightkube Client that can talk to the K8s API.""" client = lightkube.Client(field_manager="kfp-operators") return client -@pytest.fixture(scope="function") -def upload_and_clean_pipeline(kfp_client: kfp.Client): - """Upload an arbitrary pipeline and remove after test case execution.""" - pipeline_upload_response = kfp_client.pipeline_uploads.upload_pipeline( - uploadfile=SAMPLE_PIPELINE, name=SAMPLE_PIPELINE_NAME - ) - - yield pipeline_upload_response - - kfp_client.delete_pipeline(pipeline_id=pipeline_upload_response.id) - - -@pytest.fixture(scope="function") -def create_and_clean_experiment(kfp_client: kfp.Client): - """Create an experiment and remove after test case execution.""" - experiment_response = kfp_client.create_experiment( - name="test-experiment", namespace=KUBEFLOW_PROFILE_NAMESPACE - ) - - yield experiment_response - - kfp_client.delete_experiment(experiment_id=experiment_response.id) - - def pytest_addoption(parser: Parser): parser.addoption( "--bundle", diff --git a/tests/integration/kfp_globals.py b/tests/integration/kfp_globals.py index 5751f4f5..9f74dc5a 100644 --- a/tests/integration/kfp_globals.py +++ b/tests/integration/kfp_globals.py @@ -20,8 +20,13 @@ ] # Variables for uploading/creating pipelines/experiments/runs -SAMPLE_PIPELINE = f"{basedir}/tests/integration/pipelines/sample_pipeline.yaml" -SAMPLE_PIPELINE_NAME = "sample-pipeline-2" +SAMPLE_PIPELINES_PATH = f"{basedir}/tests/integration/pipelines" +SAMPLE_PIPELINE = { + "v1": f"{SAMPLE_PIPELINES_PATH}/sample_pipeline.yaml", + "v2": f"{SAMPLE_PIPELINES_PATH}/pipeline_container_no_input.yaml", +} + +SAMPLE_PIPELINE_NAME = "sample-pipeline" # Variables for creating a viewer SAMPLE_VIEWER = f"{basedir}/tests/integration/viewer/mnist.yaml" diff --git a/tests/integration/pipelines/pipeline_container_no_input.py b/tests/integration/pipelines/pipeline_container_no_input.py new file mode 100644 index 00000000..ca430c68 --- /dev/null +++ b/tests/integration/pipelines/pipeline_container_no_input.py @@ -0,0 +1,38 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def container_no_input(): + return dsl.ContainerSpec( + image="python:3.7", + command=["echo", "hello world"], + args=[], + ) + + +@dsl.pipeline(name="v2-container-component-no-input") +def pipeline_container_no_input(): + container_no_input() + + +if __name__ == "__main__": + # execute only if run as a script + compiler.Compiler().compile( + pipeline_func=pipeline_container_no_input, package_path="pipeline_container_no_input.yaml" + ) diff --git a/tests/integration/pipelines/pipeline_container_no_input.yaml b/tests/integration/pipelines/pipeline_container_no_input.yaml new file mode 100644 index 00000000..5bdd4279 --- /dev/null +++ b/tests/integration/pipelines/pipeline_container_no_input.yaml @@ -0,0 +1,29 @@ +# PIPELINE DEFINITION +# Name: v2-container-component-no-input +# This pipeline was generated using the sample_pipeline_execution_order.py in this directory +# Please do not edit this file directly +components: + comp-container-no-input: + executorLabel: exec-container-no-input +deploymentSpec: + executors: + exec-container-no-input: + container: + command: + - echo + - hello world + image: python:3.7 +pipelineInfo: + name: v2-container-component-no-input +root: + dag: + tasks: + container-no-input: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-no-input + taskInfo: + name: container-no-input +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0 diff --git a/tests/integration/test_kfp_functional.py b/tests/integration/test_kfp_functional_v1.py similarity index 73% rename from tests/integration/test_kfp_functional.py rename to tests/integration/test_kfp_functional_v1.py index d5bc1d03..fa555270 100644 --- a/tests/integration/test_kfp_functional.py +++ b/tests/integration/test_kfp_functional_v1.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -"""Functional tests for kfp-operators.""" +"""Functional tests for kfp-operators with the KFP SDK v1.""" import logging import time from pathlib import Path @@ -14,9 +14,11 @@ KFP_CHARMS, KUBEFLOW_PROFILE_NAMESPACE, SAMPLE_PIPELINE, + SAMPLE_PIPELINE_NAME, SAMPLE_VIEWER, ) +import kfp import lightkube import pytest import tenacity @@ -26,9 +28,42 @@ from pytest_operator.plugin import OpsTest +KFP_SDK_VERSION = "v1" log = logging.getLogger(__name__) +# ---- KFP SDK V1 fixtures +@pytest.fixture(scope="function") +def upload_and_clean_pipeline_v1(kfp_client: kfp.Client): + """Upload an arbitrary pipeline and remove after test case execution.""" + pipeline_upload_response = kfp_client.pipeline_uploads.upload_pipeline( + uploadfile=SAMPLE_PIPELINE[KFP_SDK_VERSION], name=SAMPLE_PIPELINE_NAME + ) + # The newer pipelines backend requires the deletion of the pipelines versions + # before we can actually remove the pipeline. This variable extracts the pipeline + # version id that can be used to remove it later in the test exectution. + pipeline_version_id = ( + kfp_client.list_pipeline_versions(pipeline_upload_response.id).versions[0].id + ) + + yield pipeline_upload_response + + kfp_client.delete_pipeline_version(pipeline_version_id) + kfp_client.delete_pipeline(pipeline_id=pipeline_upload_response.id) + + +@pytest.fixture(scope="function") +def create_and_clean_experiment_v1(kfp_client: kfp.Client): + """Create an experiment and remove after test case execution.""" + experiment_response = kfp_client.create_experiment( + name="test-experiment", namespace=KUBEFLOW_PROFILE_NAMESPACE + ) + + yield experiment_response + + kfp_client.delete_experiment(experiment_id=experiment_response.id) + + @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest, request, lightkube_client): """Build and deploy kfp-operators charms.""" @@ -77,32 +112,36 @@ async def test_build_and_deploy(ops_test: OpsTest, request, lightkube_client): idle_period=30, ) + # ---- KFP API Server focused test cases async def test_upload_pipeline(kfp_client): """Upload a pipeline from a YAML file and assert its presence.""" # Upload a pipeline and get the server response + pipeline_name = f"test-pipeline-sdk-{KFP_SDK_VERSION}" pipeline_upload_response = kfp_client.pipeline_uploads.upload_pipeline( - uploadfile=SAMPLE_PIPELINE, name="test-upload-pipeline" + uploadfile=SAMPLE_PIPELINE[KFP_SDK_VERSION], + name=pipeline_name, ) # Upload a pipeline and get its ID uploaded_pipeline_id = pipeline_upload_response.id # Get pipeline id by name, default='sample-pipeline' - server_pipeline_id = kfp_client.get_pipeline_id(name="test-upload-pipeline") + server_pipeline_id = kfp_client.get_pipeline_id(name=pipeline_name) assert uploaded_pipeline_id == server_pipeline_id -async def test_create_and_monitor_run(kfp_client, create_and_clean_experiment): + +async def test_create_and_monitor_run(kfp_client, create_and_clean_experiment_v1): """Create a run and monitor it to completion.""" # Create a run, save response in variable for easy manipulation # Create an experiment for this run - experiment_response = create_and_clean_experiment + experiment_response = create_and_clean_experiment_v1 # Create a run from a pipeline file (SAMPLE_PIPELINE) and an experiment (create_experiment). # This call uses the 'default' kubeflow service account to be able to edit Workflows create_run_response = kfp_client.create_run_from_pipeline_package( - pipeline_file=SAMPLE_PIPELINE, + pipeline_file=SAMPLE_PIPELINE[KFP_SDK_VERSION], arguments={}, - run_name="test-run-1", + run_name=f"test-run-sdk-{KFP_SDK_VERSION}", experiment_name=experiment_response.name, namespace=KUBEFLOW_PROFILE_NAMESPACE, ) @@ -111,33 +150,34 @@ async def test_create_and_monitor_run(kfp_client, create_and_clean_experiment): # Related issue: https://github.com/canonical/kfp-operators/issues/244 # Monitor the run to completion, the pipeline should not be executed in # more than 300 seconds as it is a very simple operation - #monitor_response = kfp_client.wait_for_run_completion(create_run_response.run_id, timeout=600) + # monitor_response = kfp_client.wait_for_run_completion(create_run_response.run_id, timeout=600) - #assert monitor_response.run.status == "Succeeded" + # assert monitor_response.run.status == "Succeeded" # At least get the run and extract some data while the previous check # works properly on the GitHub runners test_run = kfp_client.get_run(create_run_response.run_id).run assert test_run is not None + # ---- ScheduledWorfklows and Argo focused test case async def test_create_and_monitor_recurring_run( - kfp_client, upload_and_clean_pipeline, create_and_clean_experiment + kfp_client, upload_and_clean_pipeline_v1, create_and_clean_experiment_v1 ): """Create a recurring run and monitor it to completion.""" # Upload a pipeline from file - pipeline_response = upload_and_clean_pipeline + pipeline_response = upload_and_clean_pipeline_v1 # Create an experiment for this run - experiment_response = create_and_clean_experiment + experiment_response = create_and_clean_experiment_v1 # Create a recurring run from a pipeline (upload_pipeline_from_file) and an experiment (create_experiment) # This call uses the 'default' kubeflow service account to be able to edit ScheduledWorkflows # This ScheduledWorkflow (Recurring Run) will run once every two seconds create_recurring_run_response = kfp_client.create_recurring_run( experiment_id=experiment_response.id, - job_name="recurring-job-1", + job_name=f"recurring-job-{KFP_SDK_VERSION}", pipeline_id=pipeline_response.id, enabled=True, cron_expression="*/2 * * * * *", @@ -163,33 +203,3 @@ async def test_create_and_monitor_recurring_run( # Assert the job is disabled # assert recurring_job.enabled is False - - -# ---- KFP Viewer and Visualization focused test cases -async def test_apply_sample_viewer(lightkube_client): - """Test a Viewer can be applied and its presence is verified.""" - # Create a Viewer namespaced resource - viewer_class_resource = create_namespaced_resource( - group="kubeflow.org", version="v1beta1", kind="Viewer", plural="viewers" - ) - - # Apply viewer - viewer_object = apply_manifests(lightkube_client, yaml_file_path=SAMPLE_VIEWER) - - viewer = lightkube_client.get( - res=viewer_class_resource, - name=viewer_object.metadata.name, - namespace=viewer_object.metadata.namespace, - ) - assert viewer is not None - - -async def test_viz_server_healthcheck(ops_test: OpsTest): - """Run a healthcheck on the server endpoint.""" - status = await ops_test.model.get_status() - units = status["applications"]["kfp-viz"]["units"] - url = units["kfp-viz/0"]["address"] - headers = {"kubeflow-userid": "user"} - result_status, result_text = await fetch_response(url=f"http://{url}:8888", headers=headers) - - assert result_status == 200 diff --git a/tests/integration/test_kfp_functional_v2.py b/tests/integration/test_kfp_functional_v2.py new file mode 100644 index 00000000..1fb15274 --- /dev/null +++ b/tests/integration/test_kfp_functional_v2.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +"""Functional tests for kfp-operators with the KFP SDK v2.""" +import logging +import time +from pathlib import Path + +from helpers.bundle_mgmt import render_bundle, deploy_bundle +from helpers.k8s_resources import apply_manifests, fetch_response +from helpers.localize_bundle import get_resources_from_charm_file +from kfp_globals import ( + CHARM_PATH_TEMPLATE, + KFP_CHARMS, + KUBEFLOW_PROFILE_NAMESPACE, + SAMPLE_PIPELINE, + SAMPLE_PIPELINE_NAME, + SAMPLE_VIEWER, +) + +import kfp +import lightkube +import pytest +import tenacity +from lightkube import codecs +from lightkube.generic_resource import create_namespaced_resource +from lightkube.resources.apps_v1 import Deployment +from pytest_operator.plugin import OpsTest + +KFP_SDK_VERSION = "v2" +log = logging.getLogger(__name__) + + +# ---- KFP SDK V2 fixtures +@pytest.fixture(scope="function") +def upload_and_clean_pipeline_v2(kfp_client: kfp.Client): + """Upload an arbitrary v2 pipeline and remove after test case execution.""" + pipeline_upload_response = kfp_client.pipeline_uploads.upload_pipeline( + uploadfile=SAMPLE_PIPELINE[KFP_SDK_VERSION], name=SAMPLE_PIPELINE_NAME + ) + # The newer pipelines backend requires the deletion of the pipelines versions + # before we can actually remove the pipeline. This variable extracts the pipeline + # version id that can be used to remove it later in the test exectution. + pipeline_version_id = ( + kfp_client.list_pipeline_versions(pipeline_upload_response.pipeline_id) + .pipeline_versions[0] + .pipeline_version_id + ) + + yield pipeline_upload_response, pipeline_version_id + + kfp_client.delete_pipeline_version(pipeline_upload_response.pipeline_id, pipeline_version_id) + kfp_client.delete_pipeline(pipeline_upload_response.pipeline_id) + + +@pytest.fixture(scope="function") +def create_and_clean_experiment_v2(kfp_client: kfp.Client): + """Create an experiment and remove after test case execution.""" + experiment_response = kfp_client.create_experiment( + name="test-experiment", namespace=KUBEFLOW_PROFILE_NAMESPACE + ) + + yield experiment_response + + kfp_client.delete_experiment(experiment_id=experiment_response.experiment_id) + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest, request, lightkube_client): + """Build and deploy kfp-operators charms.""" + no_build = request.config.getoption("no_build") + + # Immediately raise an error if the model name is not kubeflow + if ops_test.model_name != "kubeflow": + raise ValueError("kfp must be deployed to namespace kubeflow") + + # Get/load template bundle from command line args + bundlefile_path = Path(request.config.getoption("bundle")) + basedir = Path("./").absolute() + + # Build the charms we need to build only if --no-build is not set + context = {} + if not no_build: + charms_to_build = { + charm: Path(CHARM_PATH_TEMPLATE.format(basedir=str(basedir), charm=charm)) + for charm in KFP_CHARMS + } + log.info(f"Building charms for: {charms_to_build}") + built_charms = await ops_test.build_charms(*charms_to_build.values()) + log.info(f"Built charms: {built_charms}") + + for charm, charm_file in built_charms.items(): + charm_resources = get_resources_from_charm_file(charm_file) + context.update([(f"{charm.replace('-', '_')}_resources", charm_resources)]) + context.update([(f"{charm.replace('-', '_')}", charm_file)]) + + # Render kfp-operators bundle file with locally built charms and their resources + rendered_bundle = render_bundle( + ops_test, bundle_path=bundlefile_path, context=context, no_build=no_build + ) + + # Deploy the kfp-operators bundle from the rendered bundle file + await deploy_bundle(ops_test, bundle_path=rendered_bundle, trust=True) + + # Wait for everything to be up. Note, at time of writing these charms would naturally go + # into blocked during deploy while waiting for each other to satisfy relations, so we don't + # raise_on_blocked. + await ops_test.model.wait_for_idle( + status="active", + raise_on_blocked=False, # These apps block while waiting for each other to deploy/relate + raise_on_error=True, + timeout=3600, + idle_period=30, + ) + + +# ---- KFP API Server focused test cases +async def test_upload_pipeline(kfp_client): + """Upload a pipeline from a YAML file and assert its presence.""" + # Upload a pipeline and get the server response + pipeline_name = f"test-pipeline-sdk-{KFP_SDK_VERSION}" + pipeline_upload_response = kfp_client.pipeline_uploads.upload_pipeline( + uploadfile=SAMPLE_PIPELINE[KFP_SDK_VERSION], + name=pipeline_name, + ) + # Upload a pipeline and get its ID + uploaded_pipeline_id = pipeline_upload_response.pipeline_id + + # Get pipeline id by name, default='sample-pipeline' + server_pipeline_id = kfp_client.get_pipeline_id(name=pipeline_name) + assert uploaded_pipeline_id == server_pipeline_id + + +async def test_create_and_monitor_run(kfp_client, create_and_clean_experiment_v2): + """Create a run and monitor it to completion.""" + # Create a run, save response in variable for easy manipulation + # Create an experiment for this run + experiment_response = create_and_clean_experiment_v2 + + # Create a run from a pipeline file (SAMPLE_PIPELINE) and an experiment (create_experiment). + # This call uses the 'default' kubeflow service account to be able to edit Workflows + create_run_response = kfp_client.create_run_from_pipeline_package( + pipeline_file=SAMPLE_PIPELINE[KFP_SDK_VERSION], + arguments={}, + run_name=f"test-run-sdk-{KFP_SDK_VERSION}", + experiment_name=experiment_response.display_name, + namespace=KUBEFLOW_PROFILE_NAMESPACE, + ) + + # Monitor the run to completion, the pipeline should not be executed in + # more than 300 seconds as it is a very simple operation + monitor_response = kfp_client.wait_for_run_completion(create_run_response.run_id, timeout=600) + + assert monitor_response.state == "SUCCEEDED" + + +# ---- ScheduledWorfklows and Argo focused test case +async def test_create_and_monitor_recurring_run( + kfp_client, upload_and_clean_pipeline_v2, create_and_clean_experiment_v2 +): + """Create a recurring run and monitor it to completion.""" + + # Upload a pipeline from file + pipeline_response, pipeline_version_id = upload_and_clean_pipeline_v2 + + # Create an experiment for this run + experiment_response = create_and_clean_experiment_v2 + + # Create a recurring run from a pipeline (upload_pipeline_from_file) and an experiment (create_experiment) + # This call uses the 'default' kubeflow service account to be able to edit ScheduledWorkflows + # This ScheduledWorkflow (Recurring Run) will run once every two seconds + create_recurring_run_response = kfp_client.create_recurring_run( + experiment_id=experiment_response.experiment_id, + job_name=f"recurring-job-{KFP_SDK_VERSION}", + pipeline_id=pipeline_response.pipeline_id, + version_id=pipeline_version_id, + enabled=True, + cron_expression="*/2 * * * * *", + max_concurrency=1, + ) + + recurring_job = create_recurring_run_response + + # Assert the job is enabled + assert recurring_job.status == "ENABLED" + + # Assert the job executes once every two seconds + assert recurring_job.trigger.cron_schedule.cron == "*/2 * * * * *" + + # Wait for the recurring job to schedule some runs + time.sleep(6) + + # FIXME: disabling the job does not work at the moment, it seems like + # the status of the recurring run is never updated and is causing the + # following assertion to fail + # Related issue: https://github.com/canonical/kfp-operators/issues/244 + # Disable the job after few runs + + kfp_client.disable_recurring_run(recurring_job.recurring_run_id) + + # Assert the job is disabled + # assert recurring_job.status is "DISABLED" + + +# ---- KFP Viewer and Visualization focused test cases +async def test_apply_sample_viewer(lightkube_client): + """Test a Viewer can be applied and its presence is verified.""" + # Create a Viewer namespaced resource + viewer_class_resource = create_namespaced_resource( + group="kubeflow.org", version="v1beta1", kind="Viewer", plural="viewers" + ) + + # Apply viewer + viewer_object = apply_manifests(lightkube_client, yaml_file_path=SAMPLE_VIEWER) + + viewer = lightkube_client.get( + res=viewer_class_resource, + name=viewer_object.metadata.name, + namespace=viewer_object.metadata.namespace, + ) + assert viewer is not None + + +async def test_viz_server_healthcheck(ops_test: OpsTest): + """Run a healthcheck on the server endpoint.""" + status = await ops_test.model.get_status() + units = status["applications"]["kfp-viz"]["units"] + url = units["kfp-viz/0"]["address"] + headers = {"kubeflow-userid": "user"} + result_status, result_text = await fetch_response(url=f"http://{url}:8888", headers=headers) + + assert result_status == 200 diff --git a/tox.ini b/tox.ini index f353c78d..8fb7d2e4 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,7 @@ [tox] skipsdist=True skip_missing_interpreters = True -envlist = {kfp-api,kfp-metadata-writer,kfp-persistence,kfp-profile-controller,kfp-schedwf,kfp-ui,kfp-viewer,kfp-viz}-{lint,unit,integration},bundle-integration +envlist = {kfp-api,kfp-metadata-writer,kfp-persistence,kfp-profile-controller,kfp-schedwf,kfp-ui,kfp-viewer,kfp-viz}-{lint,unit,integration},bundle-integration-{v1, v2} [vars] tst_path = {toxinidir}/tests/ @@ -41,7 +41,14 @@ deps = pip-tools description = Update requirements files by executing pip-compile on all requirements*.in files, including those in subdirs. -[testenv:bundle-integration] -commands = pytest -vv --tb=native -s {posargs} {[vars]tst_path}/integration +[testenv:bundle-integration-v1] +commands = pytest -vv --tb=native -s {posargs} {[vars]tst_path}integration/test_kfp_functional_v1.py deps = -r requirements-integration.txt + kfp>=1.8,<2.0 + +[testenv:bundle-integration-v2] +commands = pytest -vv --tb=native -s {posargs} {[vars]tst_path}integration/test_kfp_functional_v2.py +deps = + -r requirements-integration.txt + kfp>=2.4,<3.0