Skip to content

Commit

Permalink
Hotfix kubeflow pipelines upload (#40)
Browse files Browse the repository at this point in the history
Kubeflow requires unique pipeline names when uploading pipeline. 

Previous implementations relied on deleting the pipeline using the sdk
by finding if it exits using either the `client.list_pipelines()` and
filtering on the pipeline name or the
`client.get_pipeline_id(pipeline_name)` and then finding existing
versions from the the id, deleting them before deleting the pipeline.

Recent issues started appearing when pipelines were compiled with the v2
SDK. Both functions mentioned above stopped working. Error message:


`Failed to list pipelines with context \u0026{0xc0001ee9a0}, options
\u0026{100 0xc0012d1880}: InternalServerError: Failed to execute SQL for
listing pipelines: Error 1064: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the
right syntax to use near '(PARTITION BY PipelineId ORDER BY
CreatedAtInSec DESC) rn FROM pipeline_versions' at line 1`

The issue is not related to connection to the SQL server where the
pipelines servers are used since methods like `client.get_experiments()`
seem to work just fine. Also using the kfp v2 sdk seems to be also able
to return the pipeline versions. I suspect something changed internally
in the database when we were testing things the v2 sdk but it's not
clear. The issue should be resolved when starting from a clean slate
(new kfp cluster and database).

For now, I implemented a fix that requires passing the `pipeline_id`
(found in the UI) in order to delete existing pipelines before
uploading. I was able to circumvent the api calls that were causing the
issue. Not an ideal permanent solution but a temporary workaround.
  • Loading branch information
PhilippeMoussalli authored Apr 25, 2023
1 parent 474e1ce commit 18ff584
Showing 1 changed file with 26 additions and 16 deletions.
42 changes: 26 additions & 16 deletions fondant/pipeline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@

import os
import logging
from typing import Callable
from typing import Callable, Optional

from fondant.import_utils import is_kfp_available

if is_kfp_available():
import kfp
from kfp_server_api.exceptions import ApiException

logger = logging.getLogger(__name__)


def compile_and_upload_pipeline(
pipeline: Callable[[], None], host: str, env: str
pipeline: Callable[[], None], host: str, env: str, pipeline_id: Optional[str] = None
) -> None:
"""Upload pipeline to kubeflow.
Args:
pipeline (Callable): function that contains the pipeline definition
host (str): the url host for kfp
env (str): the project run environment (sbx, dev, prd)
pipeline: function that contains the pipeline definition
host: the url host for kfp
env: the project run environment (sbx, dev, prd)
pipeline_id: pipeline id of existing component under the same name. Pass it on when you
want to delete current existing pipelines
"""
client = kfp.Client(host=host)

Expand All @@ -28,17 +31,24 @@ def compile_and_upload_pipeline(
kfp.compiler.Compiler().compile(
pipeline_func=pipeline, package_path=pipeline_filename
)

# existing_pipelines = client.list_pipelines(page_size=100).pipelines
# for existing_pipeline in existing_pipelines:
# if existing_pipeline.name == pipeline_name:
# # Delete existing pipeline before uploading
# logger.warning(
# f"Pipeline {pipeline_name} already exists. Deleting old pipeline..."
# )
# client.delete_pipeline_version(existing_pipeline.default_version.id)
# client.delete_pipeline(existing_pipeline.id)
client.get
if pipeline_id is not None:
pipeline_versions = client.list_pipeline_versions(pipeline_id).versions
if pipeline_versions is not None:
versions_ids = [getattr(version, "id") for version in pipeline_versions]
for version_id in versions_ids:
client.delete_pipeline_version(version_id)
logger.info("Found existing pipeline under the same. Deleting the pipeline")
client.delete_pipeline(pipeline_id)

logger.info(f"Uploading pipeline: {pipeline_name}")
client.upload_pipeline(pipeline_filename, pipeline_name=pipeline_name)

try:
client.upload_pipeline(pipeline_filename, pipeline_name=pipeline_name)
except Exception as e:
raise ApiException(
f"Failed to upload the pipeline. Make sure that the pipeline {pipeline_name} does"
f" not exist. If you have a pipeline under a similar name, pass in the `pipeline id`"
f" in order to delete the existing pipeline"
) from e
os.remove(pipeline_filename)

0 comments on commit 18ff584

Please sign in to comment.