diff --git a/examples/pipelines/config.py b/examples/pipelines/config.py index d13fda280..563be6c2d 100644 --- a/examples/pipelines/config.py +++ b/examples/pipelines/config.py @@ -30,4 +30,4 @@ class KubeflowConfig(GeneralConfig): ARTIFACT_BUCKET = f"{GeneralConfig.GCP_PROJECT_ID}-kfp-output" CLUSTER_NAME = "kfp-express" CLUSTER_ZONE = "europe-west4-a" - HOST = "https://472c61c751ab9be9-dot-europe-west1.pipelines.googleusercontent.com" + HOST = "https://52074149b1563463-dot-europe-west1.pipelines.googleusercontent.com/" diff --git a/examples/pipelines/simple_pipeline/build_images.sh b/examples/pipelines/simple_pipeline/build_images.sh new file mode 100644 index 000000000..9e8204dd4 --- /dev/null +++ b/examples/pipelines/simple_pipeline/build_images.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +function usage { + echo "Usage: $0 [options]" + echo "Options:" + echo " -c, --component Set the component name. Pass the component folder name to build a certain components or 'all' to build all components in the current directory (required)" + echo " -n, --namespace Set the namespace (default: ml6team)" + echo " -r, --repo Set the repo (default: express)" + echo " -t, --tag Set the tag (default: latest)" + echo " -h, --help Display this help message" +} + +# Parse the arguments +while [[ "$#" -gt 0 ]]; do case $1 in + -n|--namespace) namespace="$2"; shift;; + -r|--repo) repo="$2"; shift;; + -t|--tag) tag="$2"; shift;; + -c|--component) component="$2"; shift;; + -h|--help) usage; exit;; + *) echo "Unknown parameter passed: $1"; exit 1;; +esac; shift; done + +# Check for required argument +if [ -z "${component}" ]; then + echo "Error: component parameter is required" + usage + exit 1 +fi + +# Set default values for optional arguments if not passed +[ -n "${namespace-}" ] || namespace="ml6team" +[ -n "${repo-}" ] || repo="express" +[ -n "${tag-}" ] || tag="latest" + +# Get the component directory +component_dir=$(pwd)/"components" + +# Loop through all subdirectories +for dir in $component_dir/*/; do + cd "$dir" + BASENAME=${dir%/} + BASENAME=${BASENAME##*/} + # Build all images or one image depending on the passed argument + if [[ "$BASENAME" == "${component}" ]] || [[ "${component}" == "all" ]]; then + full_image_name=ghcr.io/${namespace}/${BASENAME}:${tag} + echo $full_image_name + docker build -t "$full_image_name" \ + --build-arg COMMIT_SHA=$(git rev-parse HEAD) \ + --build-arg GIT_BRANCH=$(git rev-parse --abbrev-ref HEAD) \ + --build-arg BUILD_TIMESTAMP=$(date '+%F_%H:%M:%S') \ + --label org.opencontainers.image.source=https://github.com/${namespace}/${repo} \ + --platform=linux/arm64 \ + . + docker push "$full_image_name" + fi + cd "$component_dir" +done \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/embedding/src/fondant_component.yaml b/examples/pipelines/simple_pipeline/components/embedding/src/fondant_component.yaml new file mode 100644 index 000000000..7ea842992 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/embedding/src/fondant_component.yaml @@ -0,0 +1,23 @@ +name: Embedding +description: Component that embeds images using CLIP +image: embedding:latest + +input_subsets: + images: + fields: + data: + type: binary + +output_subsets: + embeddings: + fields: + data: + type: float + +args: + model_id: + description: Model id on the Hugging Face hub + type: str + batch_size: + description: Batch size to use when embedding + type: int \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/image_filtering/Dockerfile b/examples/pipelines/simple_pipeline/components/image_filtering/Dockerfile new file mode 100644 index 000000000..49b7639f9 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/image_filtering/Dockerfile @@ -0,0 +1,29 @@ +FROM --platform=linux/amd64 python:3.8-slim + +## System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git curl -y + +# Downloading gcloud package +RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz + +# Installing the package +RUN mkdir -p /usr/local/gcloud \ +&& tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz \ +&& /usr/local/gcloud/google-cloud-sdk/install.sh + +# Adding the package path to local +ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin + +# install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Copy over src-files of the component +COPY src /src + +# Set the working directory to the source folder +WORKDIR /src + +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/image_filtering/kubeflow_component.yaml b/examples/pipelines/simple_pipeline/components/image_filtering/kubeflow_component.yaml new file mode 100644 index 000000000..35570c9e6 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/image_filtering/kubeflow_component.yaml @@ -0,0 +1,35 @@ +name: image_filtering +description: A component that filters images +inputs: + - name: input_manifest_path + description: Path to the input manifest + type: String + + - name: min_width + description: Desired minimum width + type: Integer + + - name: min_height + description: Desired minimum height + type: Integer + + - name: metadata + description: Metadata arguments, passed as a json dict string + type: String + + +outputs: + - name: output_manifest_path + description: Path to the output manifest + +implementation: + container: + image: ghcr.io/ml6team/image_filtering:latest + command: [ + python3, main.py, + --input_manifest_path, {inputPath: input_manifest_path}, + --min_width, {inputValue: min_width}, + --min_height, {inputValue: min_height}, + --metadata, {inputValue: metadata}, + --output_manifest_path, {outputPath: output_manifest_path}, + ] \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/image_filtering/requirements.txt b/examples/pipelines/simple_pipeline/components/image_filtering/requirements.txt new file mode 100644 index 000000000..93ca329f4 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/image_filtering/requirements.txt @@ -0,0 +1 @@ +git+https://github.com/ml6team/express.git@b1855308ca9251da5ddd8e6b88c34bc1c082a71b#egg=express \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/image_filtering/src/fondant_component.yaml b/examples/pipelines/simple_pipeline/components/image_filtering/src/fondant_component.yaml new file mode 100644 index 000000000..b4321bf7d --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/image_filtering/src/fondant_component.yaml @@ -0,0 +1,27 @@ +name: Image filtering +description: Component that filters images based on desired minimum width and height +image: image_filtering:latest + +input_subsets: + images: + fields: + width: + type: int16 + height: + type: int16 + +output_subsets: + images: + fields: + width: + type: int16 + height: + type: int16 + +args: + min_width: + description: Desired minimum width + type: int + min_height: + description: Desired minimum height + type: int \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/image_filtering/src/main.py b/examples/pipelines/simple_pipeline/components/image_filtering/src/main.py new file mode 100644 index 000000000..ced9e2e79 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/image_filtering/src/main.py @@ -0,0 +1,38 @@ +""" +This component filters images of the dataset based on image size (minimum height and width). +""" +import logging +from typing import Dict + +import dask.dataframe as dd + +from express.dataset import FondantComponent +from express.logger import configure_logging + +configure_logging() +logger = logging.getLogger(__name__) + + +class ImageFilterComponent(FondantComponent): + """ + Component that filters images based on height and width. + """ + def process(self, dataset: dd.DataFrame, args: Dict) -> dd.DataFrame: + """ + Args: + dataset + args: args to pass to the function + + Returns: + dataset + """ + logger.info("Filtering dataset...") + min_width, min_height = args.min_width, args.min_height + filtered_dataset = dataset.filter(lambda example: example["images_width"] > min_width and example["images_height"] > min_height) + + return filtered_dataset + + +if __name__ == "__main__": + component = ImageFilterComponent() + component.run() \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/Dockerfile b/examples/pipelines/simple_pipeline/components/load_from_hub/Dockerfile new file mode 100644 index 000000000..49b7639f9 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/load_from_hub/Dockerfile @@ -0,0 +1,29 @@ +FROM --platform=linux/amd64 python:3.8-slim + +## System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git curl -y + +# Downloading gcloud package +RUN curl https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz > /tmp/google-cloud-sdk.tar.gz + +# Installing the package +RUN mkdir -p /usr/local/gcloud \ +&& tar -C /usr/local/gcloud -xvf /tmp/google-cloud-sdk.tar.gz \ +&& /usr/local/gcloud/google-cloud-sdk/install.sh + +# Adding the package path to local +ENV PATH $PATH:/usr/local/gcloud/google-cloud-sdk/bin + +# install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Copy over src-files of the component +COPY src /src + +# Set the working directory to the source folder +WORKDIR /src + +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/__init__.py b/examples/pipelines/simple_pipeline/components/load_from_hub/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/kubeflow_component.yaml b/examples/pipelines/simple_pipeline/components/load_from_hub/kubeflow_component.yaml new file mode 100644 index 000000000..19775581e --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/load_from_hub/kubeflow_component.yaml @@ -0,0 +1,30 @@ +name: load_from_hub +description: A component that takes a dataset name from the 🤗 hub as input and uploads it to a GCS bucket. +inputs: + - name: dataset_name + description: Name of dataset on the hub + type: String + + - name: batch_size + description: Batch size to use to create image metadata + type: Integer + + - name: metadata + description: Metadata arguments, passed as a json dict string + type: String + + +outputs: + - name: output_manifest_path + description: Path to the output manifest + +implementation: + container: + image: ghcr.io/ml6team/load_from_hub:latest + command: [ + python3, main.py, + --dataset_name, {inputValue: dataset_name}, + --batch_size, {inputValue: batch_size}, + --metadata, {inputValue: metadata}, + --output_manifest_path, {outputPath: output_manifest_path}, + ] \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/requirements.txt b/examples/pipelines/simple_pipeline/components/load_from_hub/requirements.txt new file mode 100644 index 000000000..d30ff6384 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/load_from_hub/requirements.txt @@ -0,0 +1,4 @@ +datasets==2.11.0 +git+https://github.com/ml6team/express.git@8ecfb9fcaf0b8d457626179fe44347df829b8979#egg=express +Pillow==9.4.0 +gcsfs==2023.4.0 \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/src/fondant_component.yaml b/examples/pipelines/simple_pipeline/components/load_from_hub/src/fondant_component.yaml new file mode 100644 index 000000000..d6d1b93f4 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/load_from_hub/src/fondant_component.yaml @@ -0,0 +1,31 @@ +name: Load from hub +description: Component that loads a dataset from the hub +image: load_from_hub:latest + +input_subsets: + images: + fields: + data: + type: binary + +output_subsets: + images: + fields: + data: + type: binary + width: + type: int16 + height: + type: int16 + captions: + fields: + data: + type: utf8 + +args: + dataset_name: + description: Name of dataset on the hub + type: str + batch_size: + description: Batch size to use to create image metadata + type: int \ No newline at end of file diff --git a/examples/pipelines/simple_pipeline/components/load_from_hub/src/main.py b/examples/pipelines/simple_pipeline/components/load_from_hub/src/main.py new file mode 100644 index 000000000..5ac296345 --- /dev/null +++ b/examples/pipelines/simple_pipeline/components/load_from_hub/src/main.py @@ -0,0 +1,79 @@ +""" +This component loads a seed dataset from the hub. +""" +import io +import logging + +from datasets import load_dataset + +import dask.dataframe as dd +import dask.array as da +import numpy as np +from PIL import Image + +from express.dataset import FondantComponent +from express.logger import configure_logging + +configure_logging() +logger = logging.getLogger(__name__) + + +def extract_width(image_bytes): + # Decode image bytes to PIL Image object + pil_image = Image.open(io.BytesIO(image_bytes)) + width = pil_image.size[0] + + return np.int16(width) + + +def extract_height(image_bytes): + # Decode image bytes to PIL Image object + pil_image = Image.open(io.BytesIO(image_bytes)) + height = pil_image.size[1] + + return np.int16(height) + + +class LoadFromHubComponent(FondantComponent): + + def load(self, args): + """ + Args: + args: additional arguments passed to the component + + Returns: + Dataset: HF dataset + """ + # 1) Load data, read as Dask dataframe + logger.info("Loading dataset from the hub...") + dataset = load_dataset(args.dataset_name, split="train") + dataset.to_parquet("data.parquet") + dask_df = dd.read_parquet("data.parquet") + + # 2) Add index to the dataframe + logger.info("Creating index...") + index_list = [idx for idx in range(len(dask_df))] + source_list = ["hub" for _ in range(len(dask_df))] + + dask_df["id"] = da.array(index_list) + dask_df["source"] = da.array(source_list) + + # 3) Rename columns + dask_df = dask_df.rename(columns={"image": "images_data", "text": "captions_data"}) + + # 4) Make sure images are bytes instead of dicts + dask_df["images_data"] = dask_df["images_data"].map(lambda x: x["bytes"], + meta=("bytes", bytes)) + + # 5) Add width and height columns + dask_df['images_width'] = dask_df['images_data'].map(extract_width, + meta=("images_width", int)) + dask_df['images_height'] = dask_df['images_data'].map(extract_height, + meta=("images_height", int)) + + return dask_df + + +if __name__ == "__main__": + component = LoadFromHubComponent(type="load") + component.run() diff --git a/examples/pipelines/simple_pipeline/config/components_config.py b/examples/pipelines/simple_pipeline/config/components_config.py new file mode 100644 index 000000000..f32b03394 --- /dev/null +++ b/examples/pipelines/simple_pipeline/config/components_config.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass + + +@dataclass +class LoadFromHubConfig: + """ + Configs for the dataset loader component + Params: + DATASET_NAME (str): Name of the dataset on the hub. + BATCH_SIZE (int): Batch size to use when creating image metadata. + """ + + DATASET_NAME = "lambdalabs/pokemon-blip-captions" + BATCH_SIZE = 1000 + + +@dataclass +class ImageFilterConfig: + """ + Configs for the image filter component + Params: + MIN_HEIGHT (int): Minimum height for each image. + MIN_WIDTH (int): Minimum width for each image. + """ + + MIN_HEIGHT = 200 + MIN_WIDTH = 200 + + +@dataclass +class EmbeddingConfig: + """ + Configs for the embedding component + Params: + MODEL_ID (int): HF model id to use for embedding. + BATCH_SIZE (int): Batch size to use when embedding. + """ + + MODEL_ID = "openai/clip-vit-large-patch14" + BATCH_SIZE = 10 diff --git a/examples/pipelines/simple_pipeline/config/general_config.py b/examples/pipelines/simple_pipeline/config/general_config.py new file mode 100644 index 000000000..29633bd4d --- /dev/null +++ b/examples/pipelines/simple_pipeline/config/general_config.py @@ -0,0 +1,33 @@ +"""General config""" + +import os + +from dataclasses import dataclass + +@dataclass +class GeneralConfig: + """ + General configs + Params: + GCP_PROJECT_ID (str): GCP project ID + DATASET_NAME (str): name of the Hugging Face dataset + ENV (str): the project run environment (sbx, dev, prd) + """ + GCP_PROJECT_ID = "soy-audio-379412" + ENV = os.environ.get('ENV', 'dev') + + +@dataclass +class KubeflowConfig(GeneralConfig): + """ + Configs for the Kubeflow cluster + Params: + ARTIFACT_BUCKET (str): the GCS bucket used to store the artifacts + CLUSTER_NAME (str): the name of the k8 cluster hosting KFP + CLUSTER_ZONE (str): the zone of the k8 cluster hosting KFP + HOST (str): the kfp host url + """ + ARTIFACT_BUCKET = f"{GeneralConfig.GCP_PROJECT_ID}_kfp-artifacts" + CLUSTER_NAME = "kfp-express" + CLUSTER_ZONE = "europe-west4-a" + HOST = "https://52074149b1563463-dot-europe-west1.pipelines.googleusercontent.com" diff --git a/examples/pipelines/simple_pipeline/simple_pipeline.py b/examples/pipelines/simple_pipeline/simple_pipeline.py new file mode 100644 index 000000000..f6bf3d617 --- /dev/null +++ b/examples/pipelines/simple_pipeline/simple_pipeline.py @@ -0,0 +1,75 @@ +"""Pipeline used to create a stable diffusion dataset from a set of given images.""" +# pylint: disable=import-error +import json + +from kfp import components as comp +from kfp import dsl + +from config.general_config import GeneralConfig, KubeflowConfig +from config.components_config import ( + LoadFromHubConfig, + ImageFilterConfig, +) +from express.pipeline_utils import compile_and_upload_pipeline + +# Load Components +artifact_bucket = KubeflowConfig.ARTIFACT_BUCKET + "/custom_artifact" +run_id = "{{workflow.name}}" + +# Component 1 +load_from_hub_op = comp.load_component("components/load_from_hub/kubeflow_component.yaml") +# Component 2 +image_filtering_op = comp.load_component("components/image_filtering/kubeflow_component.yaml") + +load_from_hub_metadata = { + "base_path": artifact_bucket, + "run_id": run_id, + "component_id": load_from_hub_op.__name__, +} +image_filtering_metadata = { + "base_path": artifact_bucket, + "run_id": run_id, + "component_id": image_filtering_op.__name__, +} +load_from_hub_metadata = json.dumps(load_from_hub_metadata) +image_filtering_metadata = json.dumps(image_filtering_metadata) + + +# Pipeline +@dsl.pipeline( + name="image-generator-dataset", + description="Pipeline that takes example images as input and returns an expanded dataset of " + "similar images as outputs", +) +# pylint: disable=too-many-arguments, too-many-locals +def sd_dataset_creator_pipeline( + load_from_hub_dataset_name: str = LoadFromHubConfig.DATASET_NAME, + load_from_hub_batch_size: int = LoadFromHubConfig.BATCH_SIZE, + load_from_hub_metadata: str = load_from_hub_metadata, + image_filter_min_width: int = ImageFilterConfig.MIN_WIDTH, + image_filter_min_height: int = ImageFilterConfig.MIN_HEIGHT, + image_filtering_metadata: str = image_filtering_metadata, +): + + # Component 1 + load_from_hub_task = load_from_hub_op( + dataset_name=load_from_hub_dataset_name, + batch_size=load_from_hub_batch_size, + metadata=load_from_hub_metadata, + ).set_display_name("Load initial images") + + # Component 2 + image_filter_task = image_filtering_op( + input_manifest_path=load_from_hub_task.outputs["output_manifest_path"], + min_width=image_filter_min_width, + min_height=image_filter_min_height, + metadata=image_filtering_metadata, + ).set_display_name("Filter images") + + +if __name__ == "__main__": + compile_and_upload_pipeline( + pipeline=sd_dataset_creator_pipeline, + host=KubeflowConfig.HOST, + env=KubeflowConfig.ENV, + ) \ No newline at end of file diff --git a/express/component_spec.py b/express/component_spec.py index 00b1eeaf6..1bdac568b 100644 --- a/express/component_spec.py +++ b/express/component_spec.py @@ -15,30 +15,26 @@ from express.exceptions import InvalidComponentSpec from express.schema import Field - -def get_kubeflow_type(python_type: str): - """ - Function that returns a Kubeflow equivalent data type from a Python data type - Args: - python_type (str): the string representation of the data type - Returns: - The kubeflow data type - """ - mapping = { - "str": "String", - "int": "Integer", - "float": "Float", - "bool": "Boolean", - "dict": "Map", - "list": "List", - "tuple": "List", - "set": "Set", - } - - try: - return mapping[python_type] - except KeyError: - raise ValueError(f"Invalid Python data type: {python_type}") +# TODO: Change after upgrading to kfp v2 +# :https://www.kubeflow.org/docs/components/pipelines/v2/data-types/parameters/ +python2kubeflow_type = { + "str": "String", + "int": "Integer", + "float": "Float", + "bool": "Boolean", + "dict": "Map", + "list": "List", + "tuple": "List", + "set": "Set", +} + +# TODO: remove after upgrading to kfpv2 +kubeflow2python_type = { + "String": str, + "Integer": int, + "Float": float, + "Boolean": bool, +} @dataclass @@ -176,9 +172,9 @@ def fields(self) -> t.Mapping[str, Field]: ) -class ExpressComponent: +class ComponentSpec: """ - Class representing an Express component + Class representing an Express component specification. Args: yaml_spec_path: The yaml file containing the component specification """ @@ -230,7 +226,7 @@ def kubeflow_component_specification(self) -> t.Dict[str, any]: KubeflowInput( name=arg_name.strip(), description=arg_info["description"].strip(), - type=get_kubeflow_type(arg_info["type"].strip()), + type=python2kubeflow_type[arg_info["type"].strip()], ) ) @@ -329,10 +325,9 @@ def output_arguments(self): """The output arguments of the component as an immutable mapping""" return types.MappingProxyType( { - info["name"]: KubeflowInput( + info["name"]: KubeflowOutput( name=info["name"], description=info["description"], - type=info["type"], ) for info in self.express_component_specification["outputs"] } diff --git a/express/dataset.py b/express/dataset.py new file mode 100644 index 000000000..22c9fba5b --- /dev/null +++ b/express/dataset.py @@ -0,0 +1,216 @@ +"""This module defines the FondantDataset class, which is a wrapper around the manifest. + +It also defines the FondantComponent class, which uses the FondantDataset class to manipulate data. +""" + +from abc import abstractmethod +import argparse +import json +from pathlib import Path +from typing import List, Mapping + +import dask.dataframe as dd + +from express.component_spec import ComponentSpec, kubeflow2python_type +from express.manifest import Manifest +from express.schema import Type, Field + + +class FondantDataset: + """Wrapper around the manifest to download and upload data into a specific framework. + + Uses Dask Dataframes for the moment. + """ + + def __init__(self, manifest: Manifest): + self.manifest = manifest + self.mandatory_subset_columns = ["id", "source"] + self.index_schema = { + "source": "string", + "id": "int64", + "__null_dask_index__": "int64", + } + + def _load_subset(self, name: str, fields: List[str]) -> dd.DataFrame: + # get subset from the manifest + subset = self.manifest.subsets[name] + # TODO remove prefix + location = "gcs://" + subset.location + + df = dd.read_parquet( + location, + columns=fields, + ) + + return df + + def load_data(self, spec: ComponentSpec) -> dd.DataFrame: + subsets = [] + for name, subset in spec.input_subsets.items(): + fields = list(subset.fields.keys()) + subset_df = self._load_subset(name, fields) + subsets.append(subset_df) + + # TODO this method should return a single dataframe with column_names called subset_field + # TODO add index + # df = concatenate_datasets(subsets) + + # return df + + def _upload_index(self, df: dd.DataFrame): + # get location + # TODO remove prefix and suffix + remote_path = "gcs://" + self.manifest.index.location + + # upload to the cloud + dd.to_parquet( + df, + remote_path, + schema=self.index_schema, + overwrite=True, + ) + + def _upload_subset(self, name: str, fields: Mapping[str, Field], df: dd.DataFrame): + # add subset to the manifest + manifest_fields = [(field.name, Type[field.type]) for field in fields.values()] + self.manifest.add_subset(name, fields=manifest_fields) + + # create expected schema + expected_schema = {field.name: field.type for field in fields.values()} + expected_schema.update(self.index_schema) + + # TODO remove prefix + remote_path = "gcs://" + self.manifest.subsets[name].location + + # upload to the cloud + dd.to_parquet(df, remote_path, schema=expected_schema, overwrite=True) + + def add_index(self, df: dd.DataFrame): + index_columns = list(self.manifest.index.fields.keys()) + + # load index dataframe + index_df = df[index_columns] + + self._upload_index(index_df) + + def add_subsets(self, df: dd.DataFrame, spec: ComponentSpec): + for name, subset in spec.output_subsets.items(): + fields = list(subset.fields.keys()) + # verify fields are present in the output dataframe + subset_columns = [f"{name}_{field}" for field in fields] + subset_columns.extend(self.mandatory_subset_columns) + + for col in subset_columns: + if col not in df.columns: + raise ValueError( + f"Field {col} defined in output subset {name} but not found in dataframe" + ) + + # load subset dataframe + subset_df = df[subset_columns] + # remove subset prefix from subset columns + subset_df = subset_df.rename( + columns={ + col: col.split("_")[-1] + for col in subset_df.columns + if col not in self.mandatory_subset_columns + } + ) + # add to the manifest and upload + self._upload_subset(name, subset.fields, subset_df) + + def upload(self, save_path): + Path(save_path).parent.mkdir(parents=True, exist_ok=True) + self.manifest.to_file(save_path) + return None + + +class FondantComponent: + def __init__(self, type="transform"): + # note: Fondant spec always needs to be called like this + # and placed in the src directory + self.spec = ComponentSpec("fondant_component.yaml") + self.type = type + + def run(self) -> dd.DataFrame: + """ + Parses input data, executes the transform, and creates the output manifest. + + Returns: + Manifest: the output manifest + """ + # step 1: add and parse arguments + args = self._add_and_parse_args() + # step 2: create Fondant dataset based on input manifest + metadata = json.loads(args.metadata) + if self.type == "load": + # TODO ideally get rid of args.metadata + # by including them in the storage args, + # getting run_id based on args.output_manifest_path + manifest = Manifest.create( + base_path=metadata["base_path"], + run_id=metadata["run_id"], + component_id=metadata["component_id"], + ) + else: + manifest = Manifest.from_file(args.input_manifest_path) + dataset = FondantDataset(manifest) + # step 3: load or transform data + if self.type == "load": + df = self.load(args) + dataset.add_index(df) + dataset.add_subsets(df, self.spec) + else: + # create HF dataset, based on component spec + input_dataset = dataset.load_data(self.spec) + # provide this dataset to the user + df = self.transform( + dataset=input_dataset, + args=args, + ) + + # step 4: create output manifest + output_manifest = dataset.upload(save_path=args.output_manifest_path) + + return output_manifest + + def _add_and_parse_args(self): + """ + Add and parse component arguments based on the component specification. + """ + parser = argparse.ArgumentParser() + # add input args + for arg in self.spec.input_arguments.values(): + parser.add_argument( + f"--{arg.name}", + type=kubeflow2python_type[arg.type], + required=False + if self.type == "load" and arg.name == "input_manifest_path" + else True, + help=arg.description, + ) + # add output args + for arg in self.spec.output_arguments.values(): + parser.add_argument( + f"--{arg.name}", + required=True, + type=str, + help=arg.description, + ) + # add metadata + parser.add_argument( + "--metadata", + type=str, + required=True, + help="The metadata associated with the pipeline run", + ) + + return parser.parse_args() + + @abstractmethod + def load(self, args) -> dd.DataFrame: + """Load initial dataset""" + + @abstractmethod + def transform(self, dataset, args) -> dd.DataFrame: + """Transform existing dataset""" diff --git a/express/gcp_storage.py b/express/gcp_storage.py deleted file mode 100644 index 8fe55b0ff..000000000 --- a/express/gcp_storage.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -General helper class to handle gcp storage functionalities -""" -import subprocess # nosec -import os -import logging -from typing import List -from urllib.parse import urlparse - -from express.storage_interface import StorageHandlerInterface, DecodedBlobPath -from express import io - -logger = logging.getLogger(__name__) - - -class StorageHandler(StorageHandlerInterface): - """Cloud storage handler class""" - - @staticmethod - def decode_blob_path(fully_qualified_blob_path: str) -> DecodedBlobPath: - """ - Function that decodes a given blob path to a list of [bucket_name, blob_path] - Args: - fully_qualified_blob_path (str): the fully qualified blob path that points to the blob - containing the blob of interest - Returns: - DecodedBlobPath: a dataclass containing the bucket and blob path name - """ - parsed_url = urlparse(fully_qualified_blob_path) - if parsed_url.scheme == "gs": - bucket, blob_path = parsed_url.hostname, parsed_url.path[1:] - else: - path = parsed_url.path[1:].split("/", 1) - bucket, blob_path = path[0], path[1] - - return DecodedBlobPath(bucket, blob_path) - - @staticmethod - def construct_blob_path(bucket: str, blob_path: str) -> str: - """ - Function that construct a fully qualified blob path from a bucket and a blob path - Args: - bucket (str): the bucket name - blob_path (str): the blob path - Returns - str: the fully qualified blob path - """ - - return f"gs://{bucket}/{blob_path}" - - @staticmethod - def get_blob_list(fully_qualified_blob_path: str) -> List[str]: - """ - Function that returns the full list of blobs from a given path - Args: - fully_qualified_blob_path (str): the fully qualified blob path that points to the blob - containing the blob of interest - Returns: - List [str]: the list of blobs - """ - blob_list = ( - subprocess.run( # nosec - ["gsutil", "ls", fully_qualified_blob_path], - capture_output=True, - check=True, - ) - .stdout.decode() - .split("\n") - ) - - return blob_list - - @staticmethod - def copy_folder( - source: str, destination: str, copy_source_content: bool = False - ) -> str: - """ - Function that copies a source folder (or blob) from a remote/local source to a local/remote - location respectively - Args: - source (str): the source blob/folder - destination (str): the destination blob/folder to copy the folder to - copy_source_content (bool): whether to copy all the folder content of the source folder - to the destination folder path (dump content of one folder to another) - Returns - str: the path to the destination copied folder - """ - if copy_source_content: - if source[-1] != "\\": - source = f"{source}\\*" - else: - source = f"{source}*" - - subprocess.run( # nosec - [ - "gsutil", - "-o", - '"GSUtil:use_gcloud_storage=True"', - "-q", - "-m", - "cp", - "-r", - source, - destination, - ], - check=True, - ) - - logger.info("Copying folder from %s to %s [DONE]", source, destination) - - folder_name = io.get_file_name(source) - - return os.path.join(destination, folder_name) - - @staticmethod - def copy_files(source_files: List[str], destination: str): - """ - Function that copies a source folder (or blob) from a remote/local source to a local/remote - location respectively - Args: - source_files (List[str]): a list containing the url (local or remote) of the file to - copy - destination (str): the destination blob/folder to copy the files to - """ - with subprocess.Popen( # nosec - [ - "gsutil", - "-o", - '"GSUtil:use_gcloud_storage=True"', - "-q", - "-m", - "cp", - "-I", - destination, - ], - stdin=subprocess.PIPE, - ) as gsutil_copy: - gsutil_copy.communicate(b"\n".join([f.encode() for f in source_files])) - - logger.info( - "A total of %s files were copied to %s", len(source_files), destination - ) - - def copy_file(self, source_file: str, destination: str) -> str: - """ - Function that copies source files from a remote/local source to a local/remote - location respectively - Args: - source_file (str): the url (local or remote) of the file to copy - destination (str): the destination blob/folder to copy the files to - Returns: - str: the path where the file was copied to - """ - self.copy_files([source_file], destination) - file_name = io.get_file_name(source_file, return_extension=True) - return os.path.join(destination, file_name) diff --git a/express/io.py b/express/io.py deleted file mode 100644 index 4643ee741..000000000 --- a/express/io.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -General I/O helpers function -""" -import pathlib -import logging -import os - -logger = logging.getLogger(__name__) - - -def get_file_extension(file_name: str) -> str: - """ - Function that returns a file extension from a file name - Args: - file_name (str): the file name to return the extension from - Returns: - (str): the file extension - """ - return pathlib.Path(file_name).suffix[1:] - - -def get_file_name(file_uri: str, return_extension=False): - """ - Function that returns the file name from a given gcs uri - Args: - file_uri (str): the file uri - return_extension (bool): a boolean to indicate whether to return the file extension or not - """ - path, extension = os.path.splitext(file_uri) - file_name = os.path.basename(path) - if return_extension: - file_name = f"{file_name}{extension}" - return file_name diff --git a/express/manifest.py b/express/manifest.py index 271db1dcb..659660d74 100644 --- a/express/manifest.py +++ b/express/manifest.py @@ -29,8 +29,8 @@ def __init__(self, specification: dict, *, base_path: str) -> None: @property def location(self) -> str: - """The resolved location of the subset""" - return self._base_path.rstrip("/") + self._specification["location"] + """The absolute location of the subset""" + return self._base_path + self._specification["location"] @property def fields(self) -> t.Mapping[str, Field]: @@ -109,7 +109,7 @@ def create(cls, *, base_path: str, run_id: str, component_id: str) -> "Manifest" "run_id": run_id, "component_id": component_id, }, - "index": {"location": f"/index/{run_id}/{component_id}"}, + "index": {"location": f"/{run_id}/{component_id}/index"}, "subsets": {}, } return cls(specification) @@ -168,11 +168,14 @@ def add_subset(self, name: str, fields: t.List[t.Tuple[str, Type]]) -> None: raise ValueError("A subset with name {name} already exists") self._specification["subsets"][name] = { - "location": f"/{name}/{self.run_id}/{self.component_id}", + "location": f"/{self.run_id}/{self.component_id}/{name}", "fields": {name: {"type": type_.value} for name, type_ in fields}, } def remove_subset(self, name: str) -> None: + if name not in self._specification["subsets"]: + raise ValueError(f"Subset {name} not found in specification") + del self._specification["subsets"][name] def __repr__(self) -> str: diff --git a/express/storage_interface.py b/express/storage_interface.py deleted file mode 100644 index 6bcb26646..000000000 --- a/express/storage_interface.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -General interface class to unify storage functions across different cloud environments -""" - -from abc import ABC, abstractmethod -from typing import List -from dataclasses import dataclass - -from dataclasses_json import dataclass_json - - -@dataclass_json -@dataclass -class StorageHandlerModule: - """Datclass to define module path for the different cloud Storage Handlers""" - - GCP: str = "express.gcp_storage" - AWS: str = "express.aws_storage" - AZURE: str = "express.azure_storage" - - -@dataclass_json -@dataclass -class DecodedBlobPath: - """Dataclass for blob path construct""" - - bucket_name: str - blob_path: str - - -class StorageHandlerInterface(ABC): - """ - General helper class for a unified interface of storage helpers across different cloud - platforms - """ - - @staticmethod - @abstractmethod - def decode_blob_path(fully_qualified_blob_path) -> DecodedBlobPath: - """ - Function that decodes a given blob path to a list of - [bucket_name, blob_path] - Args: - fully_qualified_blob_path (str): the fully qualified blob path that points to the blob - containing the blob of interest - """ - - @staticmethod - @abstractmethod - def construct_blob_path(bucket: str, blob_path: str) -> str: - """ - Function that construct a fully qualified blob path from a bucket and a blob path - Args: - bucket (str): the bucket name - blob_path (str): the blob path - Returns - str: the fully qualified blob path - """ - - @staticmethod - @abstractmethod - def get_blob_list(fully_qualified_blob_path: str) -> List: - """ - Function that returns the full list of blobs from a given path - Args: - fully_qualified_blob_path (str): the fully qualified blob path that points to the blob - containing the blob of interest - Returns: - List: the list of blobs - """ - - @staticmethod - @abstractmethod - def copy_folder( - source: str, destination: str, copy_source_content: bool = False - ) -> str: - """ - Function that copies a source folder (or blob) from a remote/local source to a local/remote - location respectively - Args: - source (str): the source blob/folder - destination (str): the destination blob/folder to copy the folder to - copy_source_content (bool): whether to copy all the folder content of the source folder - to the destination folder path (dump content of one folder to another) - Returns - str: the path to the destination copied folder - """ - - @abstractmethod - def copy_file(self, source_file: str, destination: str) -> str: - """ - Function that copies source files from a remote/local source to a local/remote - location respectively - Args: - source_file (str): the url (local or remote) of the file to copy - destination (str): the destination blob/folder to copy the files to - Returns: - str: the path where the file was copied to - """ - - @staticmethod - @abstractmethod - def copy_files(source_files: List[str], destination: str): - """ - Function that copies source files from a remote/local source to a local/remote - location respectively - Args: - source_files (Union[str, List[str]]): a list containing the urls (local or remote) of - the file to copy - destination (str): the destination blob/folder to copy the files to - """ - - def copy_parquet(self, parquet_path: str, destination: str) -> str: - """ - Function that copies source files from a remote/local source to a local/remote - location respectively - Args: - parquet_path (str): path to parquet. Can point to a single parquet file or folder for - partitions - destination (str): the destination blob/folder to copy the files to - Returns: - str: the path where the parquet file/folder was copied to - """ - if ".parquet" in parquet_path: - local_parquet_path = self.copy_file(parquet_path, destination) - else: - local_parquet_path = self.copy_folder(parquet_path, destination) - - return local_parquet_path diff --git a/pyproject.toml b/pyproject.toml index ede6e8f76..28a893a27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,17 +40,14 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.8" -dataclasses-json = "^0.5.7" jsonschema = "^4.17.3" +dask = "^2022.2.0" -datasets = { version = "^2.10.1", optional = true } kfp = { version = "^1.8.19", optional = true } kubernetes = { version = "^18.20.0", optional = true } pandas = { version = "^1.3.5", optional = true } [tool.poetry.extras] -datasets = ["datasets"] -pandas = ["pandas"] pipelines = ["kfp", "kubernetes"] [tool.poetry.group.test.dependencies] diff --git a/tests/test_components_specs.py b/tests/test_components_specs.py index 8728451a4..35584a6d2 100644 --- a/tests/test_components_specs.py +++ b/tests/test_components_specs.py @@ -3,7 +3,7 @@ import pytest import yaml from express.exceptions import InvalidComponentSpec -from express.component_spec import ExpressComponent +from express.component_spec import ComponentSpec valid_path = os.path.join("tests/component_example", "valid_component") invalid_path = os.path.join("tests/component_example", "invalid_component") @@ -26,9 +26,9 @@ def invalid_express_schema() -> str: def test_component_spec_validation(valid_express_schema, invalid_express_schema): """Test that the manifest is validated correctly on instantiation""" - ExpressComponent(valid_express_schema) + ComponentSpec(valid_express_schema) with pytest.raises(InvalidComponentSpec): - ExpressComponent(invalid_express_schema) + ComponentSpec(invalid_express_schema) def test_attribute_access(valid_express_schema): @@ -37,7 +37,7 @@ def test_attribute_access(valid_express_schema): - Fixed properties should be accessible as an attribute - Dynamic properties should be accessible by lookup """ - express_component = ExpressComponent(valid_express_schema) + express_component = ComponentSpec(valid_express_schema) assert express_component.name == "Example component" assert express_component.description == "This is an example component" @@ -48,6 +48,6 @@ def test_kfp_component_creation(valid_express_schema, valid_kubeflow_schema): """ Test that the created kubeflow component matches the expected kubeflow component """ - express_component = ExpressComponent(valid_express_schema) + express_component = ComponentSpec(valid_express_schema) kubeflow_schema = yaml.safe_load(open(valid_kubeflow_schema, 'r')) assert express_component.kubeflow_component_specification == kubeflow_schema diff --git a/tests/test_gcp_storage.py b/tests/test_gcp_storage.py deleted file mode 100644 index bcc59850a..000000000 --- a/tests/test_gcp_storage.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Test scripts for gcp storage functionalities -""" -import pytest - -from express.storage_interface import DecodedBlobPath -from express.gcp_storage import StorageHandler - - -@pytest.mark.parametrize("fully_qualified_blob_path, expected_result", [ - - ("gs://my-bucket/my-file.txt", - DecodedBlobPath(bucket_name="my-bucket", blob_path="my-file.txt")), - - ("gs://my-bucket/my-folder/my-file.txt", - DecodedBlobPath(bucket_name="my-bucket", blob_path="my-folder/my-file.txt")), - - ("gs://bucket/path/to/my-file.test.txt", - DecodedBlobPath(bucket_name="bucket", blob_path="path/to/my-file.test.txt")) -]) -def test_decode_blob_path(fully_qualified_blob_path, expected_result): - """Test blob path decoding""" - handler = StorageHandler() - assert handler.decode_blob_path(fully_qualified_blob_path) == expected_result diff --git a/tests/test_import_utils.py b/tests/test_import_utils.py index 46253ab76..e53c8fa4d 100644 --- a/tests/test_import_utils.py +++ b/tests/test_import_utils.py @@ -7,7 +7,7 @@ @pytest.mark.parametrize("package_name, import_error_msg, expected_result", [ - ("dataclasses_json", "dataclasses json package is not installed.", True), + ("jsonschema", "jsonschema package is not installed.", True), ("nonexistentpackage", "This package does not exist.", pytest.raises(ModuleNotFoundError)), ]) def test_is_package_available(package_name, import_error_msg, expected_result): diff --git a/tests/test_io.py b/tests/test_io.py deleted file mode 100644 index fcdb2fe57..000000000 --- a/tests/test_io.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -Test scripts for io functionalities -""" -import pytest - -from express.io import get_file_name, get_file_extension - - -@pytest.mark.parametrize( - "file_uri, return_extension, expected_result", - [ - ("gs://bucket/path/to/file", False, "file"), - ("gs://bucket/path/to/file.txt", True, "file.txt"), - ("gs://bucket/path/to/file.name.with.dots.csv", False, "file.name.with.dots"), - ("gs://bucket/path/to/file.name.with.dots.csv", True, "file.name.with.dots.csv"), - ], -) -def test_get_file_name(file_uri, return_extension, expected_result): - """Test get file name function""" - assert get_file_name(file_uri, return_extension) == expected_result - - -@pytest.mark.parametrize( - "file_name, expected_result", - [ - ("file.jpg", "jpg"), - ("file", ""), - ("file.test.jpg", "jpg"), - ("file/test.jpg", "jpg"), - ], -) -def test_file_extension(file_name, expected_result): - """Test get file extension function""" - assert get_file_extension(file_name) == expected_result diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 3156f2cf2..8391deb9e 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -104,11 +104,11 @@ def test_manifest_creation(): "component_id": component_id, }, "index": { - "location": f"/index/{run_id}/{component_id}" + "location": f"/{run_id}/{component_id}/index" }, "subsets": { "images": { - "location": f"/images/{run_id}/{component_id}", + "location": f"/{run_id}/{component_id}/images", "fields": { "width": { "type": "int32",