Skip to content

Commit

Permalink
use fastavro to address #412 (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Ferriero authored Feb 25, 2020
1 parent 7822765 commit 9b7b23f
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 130 deletions.
135 changes: 135 additions & 0 deletions examples/dataflow-data-generator/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from data_generator.PerformantDataGenerator import DataGenerator, FakeRowGen, \
parse_data_generator_args, validate_data_args, fetch_schema
import avro.schema
import fastavro

from data_generator.CsvUtil import dict_to_csv
from data_generator.AvroUtil import fix_record_for_avro
Expand Down Expand Up @@ -83,17 +83,18 @@ def run(argv=None):
)

if data_args.avro_schema_file:
avsc = avro.schema.Parse(open(data_args.avro_schema_file, 'rb').read())
fastavro_avsc = fastavro.schema.load_schema(data_args.avro_schema_file)

(rows
# Need to convert time stamps from strings to timestamp-micros
| 'Fix date and time Types for Avro.' >>
beam.FlatMap(lambda row: fix_record_for_avro(row, avsc))
beam.FlatMap(lambda row: fix_record_for_avro(row, fastavro_avsc))
| 'Write to Avro.' >> beam.io.avroio.WriteToAvro(
file_path_prefix=data_args.output_prefix,
codec='null',
file_name_suffix='.avro',
use_fastavro=True,
schema=avsc))
schema=fastavro_avsc))

if data_args.output_bq_table:
(rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
# limitations under the License.

import avro.schema
import fastavro
import json
from .TimeUtil import datetime_to_epoch_timestamp, date_to_epoch_date, \
time_to_epoch_time


def fix_record_for_avro(record, avro_schema):
for field in avro_schema.fields:
field_name = field.name
datatype = field.type.to_json()
for field in avro_schema['fields']:
field_name = field['name']
datatype = field['type']
if isinstance(datatype, dict):
# This is a record type definition so we need to recurse a level deeper.
record[field_name] = fix_record_for_avro(
record[field_name], avro.schema.Parse(json.dumps(datatype)))[0]
record[field_name], fastavro.parse_schema(datatype))[0]
elif isinstance(datatype, list) and isinstance(datatype[1], dict):
logical_type = datatype[1].get('logicalType', None)
if logical_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from data_generator.PrettyDataGenerator import DataGenerator, FakeRowGen, \
parse_data_generator_args, validate_data_args, fetch_schema,\
write_n_line_file_to_gcs
import avro.schema
import fastavro
import os

from data_generator.CsvUtil import dict_to_csv
Expand Down Expand Up @@ -124,18 +124,18 @@ def run(argv=None):
)

if data_args.avro_schema_file:
avsc = avro.schema.parse(open(data_args.avro_schema_file, 'rb').read())
fastavro_avsc = fastavro.schema.load_schema(data_args.avro_schema_file)

(rows
# Need to convert time stamps from strings to timestamp-micros
| 'Fix date and time Types for Avro.' >>
beam.FlatMap(lambda row: fix_record_for_avro(row, avsc))
beam.FlatMap(lambda row: fix_record_for_avro(row, fastavro_avsc))
| 'Write to Avro.' >> beam.io.avroio.WriteToAvro(
file_path_prefix=data_args.output_prefix,
codec='null',
file_name_suffix='.avro',
use_fastavro=True,
schema=avsc))
schema=fastavro_avsc))

if data_args.output_bq_table:
(rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
parse_data_generator_args, validate_data_args, fetch_schema,\
write_n_line_file_to_gcs

import avro.schema
import fastavro
import os

Expand Down Expand Up @@ -106,13 +105,12 @@ def run(argv=None):
)

if data_args.avro_schema_file:
avsc = avro.schema.parse(open(data_args.avro_schema_file, 'rb').read())
fastavro_avsc = fastavro.schema.load_schema(data_args.avro_schema_file)

(rows
# Need to convert time stamps from strings to timestamp-micros
| 'Fix date and time Types for Avro.' >>
beam.FlatMap(lambda row: fix_record_for_avro(row, avsc))
beam.FlatMap(lambda row: fix_record_for_avro(row, fastavro_avsc))
| 'Write to Avro.' >> beam.io.avroio.WriteToAvro(
file_path_prefix=data_args.output_prefix,
codec='null',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
author="Jacob Ferriero",
author_email="jferriero@google.com",
install_requires=[
'apache-beam[gcp]>=2.8.0', 'Faker>=0.8.13', 'faker-schema>=0.1.4',
'google-cloud>=0.32', 'google-cloud-bigquery>=1.1.0',
'google-cloud-dataflow>=2.4.0', 'google-cloud-datastore>=1.7.0',
'google-cloud-pubsub>=0.30.1', 'google-cloud-storage>=1.6.0',
'google-cloud-vision>=0.31.0', 'mock>=2.0.0', 'numpy>=1.14.2',
'pandas>=0.23.4', 'six>=1.10.0', 'scipy>=1.1.0'
'apache-beam[gcp]>=2.16.0', 'avro-python3>=1.8.1,!=1.9.2,<1.10.0'
'Faker>=0.8.13', 'faker-schema>=0.1.4', 'google-cloud>=0.32',
'google-cloud-bigquery>=1.1.0', 'google-cloud-pubsub>=0.30.1',
'google-cloud-storage>=1.6.0', 'google-cloud-vision>=0.31.0',
'google-resumable-media>=0.5.0', 'mock>=2.0.0', 'numpy>=1.14.2',
'pandas>=0.23.4', 'scipy>=1.1.0', 'httplib2>=0.10.3'
],
packages=['data_generator'])
Loading

0 comments on commit 9b7b23f

Please sign in to comment.