Skip to content

Commit

Permalink
Components - pandas - Transform DataFrame - Fixed transforms that rep…
Browse files Browse the repository at this point in the history
…lace dataframe (#4042)

* Components - pandas - Transform DataFrame - Fixed transforms that replace dataframe

* Updated the sample pipeline
  • Loading branch information
Ark-kun committed Jun 23, 2020
1 parent c7ef9b4 commit cb4c1b2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
28 changes: 26 additions & 2 deletions components/pandas/Transform_DataFrame/_samples/sample_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

chicago_taxi_dataset_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e3337b8bdcd63636934954e592d4b32c95b49129/components/datasets/Chicago%20Taxi/component.yaml')
convert_csv_to_apache_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d737c448723b9f541a3543012b4414c17b2eab5c/components/_converters/ApacheParquet/from_CSV/component.yaml')
convert_apache_parquet_to_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/af3eaf64e87313795cad1add9bfd9fa1e86af6de/components/_converters/ApacheParquet/to_CSV/component.yaml')

pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e69a6694/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml')
pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e69a6694/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml')
pandas_transform_parquet_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_ApacheParquet_format/component.yaml')
pandas_transform_csv_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/6162d55998b176b50267d351241100bb0ee715bc/components/pandas/Transform_DataFrame/in_CSV_format/component.yaml')


def pandas_transform_pipeline():
Expand All @@ -22,12 +23,35 @@ def pandas_transform_pipeline():
table=training_data_in_parquet,
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
).output
convert_apache_parquet_to_csv_op(training_data_for_classification_in_parquet)

features_in_parquet = pandas_transform_parquet_op(
table=training_data_in_parquet,
transform_code='''df = df.drop(columns=["tips"])''',
).output
convert_apache_parquet_to_csv_op(features_in_parquet)

labels_in_parquet = pandas_transform_parquet_op(
table=training_data_in_parquet,
transform_code='''df = df[["tips"]]''',
).output
convert_apache_parquet_to_csv_op(labels_in_parquet)

training_data_for_classification_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df.insert(0, "was_tipped", df["tips"] > 0); del df["tips"]''',
).output

features_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df = df.drop(columns=["tips"])''',
).output

labels_in_csv = pandas_transform_csv_op(
table=training_data_in_csv,
transform_code='''df = df[["tips"]]''',
).output


if __name__ == '__main__':
kfp_endpoint=None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ def Pandas_Transform_DataFrame_in_ApacheParquet_format(
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
import pandas
from pyarrow import parquet

df = pandas.read_parquet(table_path)
exec(transform_code)
df.to_parquet(transformed_table_path)
# The namespace is needed so that the code can replace `df`. For example df = df[['X']]
namespace = locals()
exec(transform_code, namespace)
namespace['df'].to_parquet(transformed_table_path)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ implementation:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
import pandas
from pyarrow import parquet
df = pandas.read_parquet(table_path)
exec(transform_code)
df.to_parquet(transformed_table_path)
# The namespace is needed so that the code can replace `df`. For example df = df[['X']]
namespace = locals()
exec(transform_code, namespace)
namespace['df'].to_parquet(transformed_table_path)
import argparse
_parser = argparse.ArgumentParser(prog='Pandas Transform DataFrame in ApacheParquet format', description='Transform DataFrame loaded from an ApacheParquet file.\n\n Inputs:\n table: DataFrame to transform.\n transform_code: Transformation code. Code is written in Python and can consist of multiple lines.\n The DataFrame variable is called "df".\n Examples:\n - `df[\'prod\'] = df[\'X\'] * df[\'Y\']`\n - `df = df[[\'X\', \'prod\']]`\n - `df.insert(0, "is_positive", df["X"] > 0)`\n\n Outputs:\n transformed_table: Transformed DataFrame.\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ def Pandas_Transform_DataFrame_in_CSV_format(
df = pandas.read_csv(
table_path,
)
exec(transform_code)
df.to_csv(
# The namespace is needed so that the code can replace `df`. For example df = df[['X']]
namespace = locals()
exec(transform_code, namespace)
namespace['df'].to_csv(
transformed_table_path,
index=False,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ implementation:
df = pandas.read_csv(
table_path,
)
exec(transform_code)
df.to_csv(
# The namespace is needed so that the code can replace `df`. For example df = df[['X']]
namespace = locals()
exec(transform_code, namespace)
namespace['df'].to_csv(
transformed_table_path,
index=False,
)
Expand Down

0 comments on commit cb4c1b2

Please sign in to comment.