diff --git a/components/pandas/Transform_DataFrame/_samples/sample_pipeline.py b/components/pandas/Transform_DataFrame/_samples/sample_pipeline.py index 9d6de4759cc..da01b50506d 100644 --- a/components/pandas/Transform_DataFrame/_samples/sample_pipeline.py +++ b/components/pandas/Transform_DataFrame/_samples/sample_pipeline.py @@ -4,9 +4,10 @@ chicago_taxi_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml') convert_csv_to_apache_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d737c448723b9f541a3543012b4414c17b2eab5c/components/_converters/ApacheParquet/from_CSV/component.yaml') +convert_apache_parquet_to_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/af3eaf64e87313795cad1add9bfd9fa1e86af6de/components/_converters/ApacheParquet/to_CSV/component.yaml') -pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e69a6694/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml') -pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e69a6694/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml') +pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml') +pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml') def pandas_transform_pipeline(): @@ -22,12 +23,35 @@ def pandas_transform_pipeline(): table=training_data_in_parquet, transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''', ).output + convert_apache_parquet_to_csv_op(training_data_for_classification_in_parquet) + + features_in_parquet = pandas_transform_parquet_op( + table=training_data_in_parquet, + transform_code='''df = df.drop(columns=["tips"])''', + ).output + convert_apache_parquet_to_csv_op(features_in_parquet) + + labels_in_parquet = pandas_transform_parquet_op( + table=training_data_in_parquet, + transform_code='''df = df[["tips"]]''', + ).output + convert_apache_parquet_to_csv_op(labels_in_parquet) training_data_for_classification_in_csv = pandas_transform_csv_op( table=training_data_in_csv, transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''', ).output + features_in_csv = pandas_transform_csv_op( + table=training_data_in_csv, + transform_code='''df = df.drop(columns=["tips"])''', + ).output + + labels_in_csv = pandas_transform_csv_op( + table=training_data_in_csv, + transform_code='''df = df[["tips"]]''', + ).output + if __name__ == '__main__': kfp_endpoint=None diff --git a/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.py b/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.py index 94bd6c47f41..3ca36a270c0 100644 --- a/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.py +++ b/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.py @@ -23,11 +23,12 @@ def Pandas_Transform_DataFrame_in_ApacheParquet_format( author: Alexey Volkov ''' import pandas - from pyarrow import parquet df = pandas.read_parquet(table_path) - exec(transform_code) - df.to_parquet(transformed_table_path) + # The namespace is needed so that the code can replace `df`. For example df = df[['X']] + namespace = locals() + exec(transform_code, namespace) + namespace['df'].to_parquet(transformed_table_path) if __name__ == '__main__': diff --git a/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml b/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml index e424049a671..233f8b8d663 100644 --- a/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml +++ b/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml @@ -63,11 +63,12 @@ implementation: author: Alexey Volkov ''' import pandas - from pyarrow import parquet df = pandas.read_parquet(table_path) - exec(transform_code) - df.to_parquet(transformed_table_path) + # The namespace is needed so that the code can replace `df`. For example df = df[['X']] + namespace = locals() + exec(transform_code, namespace) + namespace['df'].to_parquet(transformed_table_path) import argparse _parser = argparse.ArgumentParser(prog='Pandas Transform DataFrame in ApacheParquet format', description='Transform DataFrame loaded from an ApacheParquet file.\n\n Inputs:\n table: DataFrame to transform.\n transform_code: Transformation code. Code is written in Python and can consist of multiple lines.\n The DataFrame variable is called "df".\n Examples:\n - `df[\'prod\'] = df[\'X\'] * df[\'Y\']`\n - `df = df[[\'X\', \'prod\']]`\n - `df.insert(0, "is_positive", df["X"] > 0)`\n\n Outputs:\n transformed_table: Transformed DataFrame.\n\n Annotations:\n author: Alexey Volkov ') diff --git a/components/pandas/Transform_DataFrame/in_CSV_format/component.py b/components/pandas/Transform_DataFrame/in_CSV_format/component.py index 3e72b3f3427..0bbfe77446e 100644 --- a/components/pandas/Transform_DataFrame/in_CSV_format/component.py +++ b/components/pandas/Transform_DataFrame/in_CSV_format/component.py @@ -27,8 +27,10 @@ def Pandas_Transform_DataFrame_in_CSV_format( df = pandas.read_csv( table_path, ) - exec(transform_code) - df.to_csv( + # The namespace is needed so that the code can replace `df`. For example df = df[['X']] + namespace = locals() + exec(transform_code, namespace) + namespace['df'].to_csv( transformed_table_path, index=False, ) diff --git a/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml b/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml index 3bccf33b522..e40ee113ace 100644 --- a/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml +++ b/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml @@ -66,8 +66,10 @@ implementation: df = pandas.read_csv( table_path, ) - exec(transform_code) - df.to_csv( + # The namespace is needed so that the code can replace `df`. For example df = df[['X']] + namespace = locals() + exec(transform_code, namespace) + namespace['df'].to_csv( transformed_table_path, index=False, )