Skip to content

Commit

Permalink
Components - Converters from ApacheParquet to CSV and TSV (#4031)
Browse files Browse the repository at this point in the history
* Components - Converters from ApacheParquet to CSV and TSV

* Updated the sample pipeline
  • Loading branch information
Ark-kun authored Jun 23, 2020
1 parent ea94251 commit c4340f6
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import kfp
from kfp import components

component_store = components.ComponentStore(url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/0d7d6f41c92bdc05c2825232afe2b47e5cb6c4b3/components/'])
component_store = components.ComponentStore(url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/af3eaf64e87313795cad1add9bfd9fa1e86af6de/components/'])

chicago_taxi_dataset_op = component_store.load_component(name='datasets/Chicago_Taxi_Trips')
convert_csv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_CSV')
convert_tsv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_TSV')
convert_apache_parquet_to_csv_op = component_store.load_component(name='_converters/ApacheParquet/to_CSV')
convert_apache_parquet_to_tsv_op = component_store.load_component(name='_converters/ApacheParquet/to_TSV')
convert_apache_parquet_to_apache_arrow_feather_op = component_store.load_component(name='_converters/ApacheParquet/to_ApacheArrowFeather')
convert_apache_arrow_feather_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_ApacheArrowFeather')

Expand All @@ -25,10 +27,12 @@ def parquet_pipeline():
).output

csv_parquet = convert_csv_to_apache_parquet_op(csv).output
csv_parquet_csv = convert_apache_parquet_to_csv_op(csv_parquet).output
csv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(csv_parquet).output
csv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(csv_parquet_feather).output

tsv_parquet = convert_tsv_to_apache_parquet_op(tsv).output
tsv_parquet_tsv = convert_apache_parquet_to_tsv_op(tsv_parquet).output
tsv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(tsv_parquet).output
tsv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(tsv_parquet_feather).output

Expand Down
29 changes: 29 additions & 0 deletions components/_converters/ApacheParquet/to_CSV/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_apache_parquet_to_csv(
data_path: InputPath('ApacheParquet'),
output_data_path: OutputPath('CSV'),
):
'''Converts Apache Parquet to CSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import parquet

data_frame = parquet.read_pandas(data_path).to_pandas()
data_frame.to_csv(
output_data_path,
index=False,
)


if __name__ == '__main__':
convert_apache_parquet_to_csv_op = create_component_from_func(
convert_apache_parquet_to_csv,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1', 'pandas==1.0.3']
)
62 changes: 62 additions & 0 deletions components/_converters/ApacheParquet/to_CSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
name: Convert apache parquet to csv
description: |-
Converts Apache Parquet to CSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
inputs:
- {name: data, type: ApacheParquet}
outputs:
- {name: output_data, type: CSV}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pyarrow==0.17.1' 'pandas==1.0.3' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3
-m pip install --quiet --no-warn-script-location 'pyarrow==0.17.1' 'pandas==1.0.3'
--user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def convert_apache_parquet_to_csv(
data_path,
output_data_path,
):
'''Converts Apache Parquet to CSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import parquet
data_frame = parquet.read_pandas(data_path).to_pandas()
data_frame.to_csv(
output_data_path,
index=False,
)
import argparse
_parser = argparse.ArgumentParser(prog='Convert apache parquet to csv', description='Converts Apache Parquet to CSV.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = convert_apache_parquet_to_csv(**_parsed_args)
args:
- --data
- {inputPath: data}
- --output-data
- {outputPath: output_data}
30 changes: 30 additions & 0 deletions components/_converters/ApacheParquet/to_TSV/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_apache_parquet_to_tsv(
data_path: InputPath('ApacheParquet'),
output_data_path: OutputPath('TSV'),
):
'''Converts Apache Parquet to TSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import parquet

data_frame = parquet.read_pandas(data_path).to_pandas()
data_frame.to_csv(
output_data_path,
index=False,
sep='\t',
)


if __name__ == '__main__':
convert_apache_parquet_to_tsv_op = create_component_from_func(
convert_apache_parquet_to_tsv,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1', 'pandas==1.0.3']
)
63 changes: 63 additions & 0 deletions components/_converters/ApacheParquet/to_TSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Convert apache parquet to tsv
description: |-
Converts Apache Parquet to TSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
inputs:
- {name: data, type: ApacheParquet}
outputs:
- {name: output_data, type: TSV}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pyarrow==0.17.1' 'pandas==1.0.3' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3
-m pip install --quiet --no-warn-script-location 'pyarrow==0.17.1' 'pandas==1.0.3'
--user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def convert_apache_parquet_to_tsv(
data_path,
output_data_path,
):
'''Converts Apache Parquet to TSV.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import parquet
data_frame = parquet.read_pandas(data_path).to_pandas()
data_frame.to_csv(
output_data_path,
index=False,
sep='\t',
)
import argparse
_parser = argparse.ArgumentParser(prog='Convert apache parquet to tsv', description='Converts Apache Parquet to TSV.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = convert_apache_parquet_to_tsv(**_parsed_args)
args:
- --data
- {inputPath: data}
- --output-data
- {outputPath: output_data}

0 comments on commit c4340f6

Please sign in to comment.