Skip to content

Commit

Permalink
Components - Apache Parquet converters (#3834)
Browse files Browse the repository at this point in the history
* Components - Apache Parquet converters

Added components that convert to and from Apache Parquet data format

* Added sample pipeline
  • Loading branch information
Ark-kun committed May 29, 2020
1 parent 8d738ea commit d737c44
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 0 deletions.
37 changes: 37 additions & 0 deletions components/_converters/ApacheParquet/_samples/sample_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import kfp
from kfp import components

component_store = components.ComponentStore(url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/0d7d6f41c92bdc05c2825232afe2b47e5cb6c4b3/components/'])

chicago_taxi_dataset_op = component_store.load_component(name='datasets/Chicago_Taxi_Trips')
convert_csv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_CSV')
convert_tsv_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_TSV')
convert_apache_parquet_to_apache_arrow_feather_op = component_store.load_component(name='_converters/ApacheParquet/to_ApacheArrowFeather')
convert_apache_arrow_feather_to_apache_parquet_op = component_store.load_component(name='_converters/ApacheParquet/from_ApacheArrowFeather')


def parquet_pipeline():
csv = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
limit=10000,
).output

tsv = chicago_taxi_dataset_op(
where='trip_start_timestamp >= "2019-01-01" AND trip_start_timestamp < "2019-02-01"',
select='tips,trip_seconds,trip_miles,pickup_community_area,dropoff_community_area,fare,tolls,extras,trip_total',
limit=10000,
format='tsv',
).output

csv_parquet = convert_csv_to_apache_parquet_op(csv).output
csv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(csv_parquet).output
csv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(csv_parquet_feather).output

tsv_parquet = convert_tsv_to_apache_parquet_op(tsv).output
tsv_parquet_feather = convert_apache_parquet_to_apache_arrow_feather_op(tsv_parquet).output
tsv_parquet_feather_parquet = convert_apache_arrow_feather_to_apache_parquet_op(tsv_parquet_feather).output

if __name__ == '__main__':
kfp_endpoint = None
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(parquet_pipeline, arguments={})
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_apache_arrow_feather_to_apache_parquet(
data_path: InputPath('ApacheArrowFeather'),
output_data_path: OutputPath('ApacheParquet'),
):
'''Converts Apache Arrow Feather to Apache Parquet.
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import feather, parquet

table = feather.read_table(data_path)
parquet.write_table(table, output_data_path)


if __name__ == '__main__':
create_component_from_func(
convert_apache_arrow_feather_to_apache_parquet,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1']
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: Convert apache arrow feather to apache parquet
description: |-
Converts Apache Arrow Feather to Apache Parquet.
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
inputs:
- {name: data, type: ApacheArrowFeather}
outputs:
- {name: output_data, type: ApacheParquet}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def convert_apache_arrow_feather_to_apache_parquet(
data_path,
output_data_path,
):
'''Converts Apache Arrow Feather to Apache Parquet.
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import feather, parquet
table = feather.read_table(data_path)
parquet.write_table(table, output_data_path)
import argparse
_parser = argparse.ArgumentParser(prog='Convert apache arrow feather to apache parquet', description='Converts Apache Arrow Feather to Apache Parquet.\n\n [Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = convert_apache_arrow_feather_to_apache_parquet(**_parsed_args)
_output_serializers = [
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --data
- {inputPath: data}
- --output-data
- {outputPath: output_data}
26 changes: 26 additions & 0 deletions components/_converters/ApacheParquet/from_CSV/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_csv_to_apache_parquet(
data_path: InputPath('CSV'),
output_data_path: OutputPath('ApacheParquet'),
):
'''Converts CSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import csv, parquet

table = csv.read_csv(data_path)
parquet.write_table(table, output_data_path)


if __name__ == '__main__':
create_component_from_func(
convert_csv_to_apache_parquet,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1']
)
72 changes: 72 additions & 0 deletions components/_converters/ApacheParquet/from_CSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Convert csv to apache parquet
description: |-
Converts CSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
inputs:
- {name: data, type: CSV}
outputs:
- {name: output_data, type: ApacheParquet}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def convert_csv_to_apache_parquet(
data_path,
output_data_path,
):
'''Converts CSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import csv, parquet
table = csv.read_csv(data_path)
parquet.write_table(table, output_data_path)
import argparse
_parser = argparse.ArgumentParser(prog='Convert csv to apache parquet', description='Converts CSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = convert_csv_to_apache_parquet(**_parsed_args)
_output_serializers = [
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --data
- {inputPath: data}
- --output-data
- {outputPath: output_data}
26 changes: 26 additions & 0 deletions components/_converters/ApacheParquet/from_TSV/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_tsv_to_apache_parquet(
data_path: InputPath('TSV'),
output_data_path: OutputPath('ApacheParquet'),
):
'''Converts TSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import csv, parquet

table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t'))
parquet.write_table(table, output_data_path)


if __name__ == '__main__':
create_component_from_func(
convert_tsv_to_apache_parquet,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1']
)
72 changes: 72 additions & 0 deletions components/_converters/ApacheParquet/from_TSV/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
name: Convert tsv to apache parquet
description: |-
Converts TSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
inputs:
- {name: data, type: TSV}
outputs:
- {name: output_data, type: ApacheParquet}
implementation:
container:
image: python:3.7
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'pyarrow==0.17.1' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install
--quiet --no-warn-script-location 'pyarrow==0.17.1' --user) && "$0" "$@"
- python3
- -u
- -c
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def convert_tsv_to_apache_parquet(
data_path,
output_data_path,
):
'''Converts TSV table to Apache Parquet.
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import csv, parquet
table = csv.read_csv(data_path, parse_options=csv.ParseOptions(delimiter='\t'))
parquet.write_table(table, output_data_path)
import argparse
_parser = argparse.ArgumentParser(prog='Convert tsv to apache parquet', description='Converts TSV table to Apache Parquet.\n\n [Apache Parquet](https://parquet.apache.org/)\n\n Annotations:\n author: Alexey Volkov <alexey.volkov@ark-kun.com>')
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--output-data", dest="output_data_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = convert_tsv_to_apache_parquet(**_parsed_args)
_output_serializers = [
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --data
- {inputPath: data}
- --output-data
- {outputPath: output_data}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from kfp.components import InputPath, OutputPath, create_component_from_func

def convert_apache_parquet_to_apache_arrow_feather(
data_path: InputPath('ApacheParquet'),
output_data_path: OutputPath('ApacheArrowFeather'),
):
'''Converts Apache Parquet to Apache Arrow Feather.
[Apache Arrow Feather](https://arrow.apache.org/docs/python/feather.html)
[Apache Parquet](https://parquet.apache.org/)
Annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
'''
from pyarrow import feather, parquet

data_frame = parquet.read_pandas(data_path).to_pandas()
feather.write_feather(data_frame, output_data_path)


if __name__ == '__main__':
convert_apache_parquet_to_apache_arrow_feather_op = create_component_from_func(
convert_apache_parquet_to_apache_arrow_feather,
output_component_file='component.yaml',
base_image='python:3.7',
packages_to_install=['pyarrow==0.17.1', 'pandas==1.0.3']
)
Loading

0 comments on commit d737c44

Please sign in to comment.