BigFlow provides support for the following data processing technologies on GCP:
However, you are not limited to these technologies. The only limitation is the Python language.
The utils provided by BigFlow solve problems that must have been solved anyway. They make your job easier. Example use cases:
- Configuring a Beam pipeline.
- Creating a BigQuery table.
- Performing read/write operation on a BigQuery table.
- Building Python package from your project for a Beam pipeline.
The BigFlow project starter provides examples each technology. Before you dive into the next chapters, create an example project using the starter.
A standard BigFlow project is a Python package. It happens, that Apache Beam supports running jobs as a Python package. Thanks to that, running Beam jobs requires almost no support.
The BigFlow project starter provides an example Beam workflow called wordcount
.
The file pipeline.py
contains function :
def dataflow_pipeline_options():
return dict(
project=workflow_config['gcp_project_id'],
staging_location=f"gs://{workflow_config['staging_location']}",
temp_location=f"gs://{workflow_config['temp_location']}",
region=workflow_config['region'],
machine_type=workflow_config['machine_type'],
max_num_workers=2,
autoscaling_algorithm='THROUGHPUT_BASED',
runner='DataflowRunner',
service_account_email='your-service-account',
)
This function returns a dictionary with pipeline options.
The pipeline configuration contains staging_location
and temp_location
directories.
These directories are placed in a Cloud Storage Bucket.
Beam uses these directories during processing to store temp files. The specified bucket and directories are not created automatically.
The second important part of the wordcount
example is the workflow.py
module:
import logging
import apache_beam as beam
import bigflow
from apache_beam.io import WriteToText
from bigflow.dataflow import BeamJob
from .pipeline import dataflow_pipeline_options, workflow_config
from .processing import count_words
logger = logging.getLogger(__name__)
def wordcount_entry_point(
pipeline: Pipeline,
context: bigflow.JobContext,
temp_location: str,
):
logger.info(f'Running wordcount at {context.runtime_str}')
(pipeline
"Count words" >> count_words()
"Save" >> WriteToText(f"gs://{temp_location}/beam_wordcount")
)
wordcount_workflow = bigflow.Workflow(
workflow_id="wordcount",
log_config={
'gcp_project_id': workflow_config['gcp_project_id'],
'log_level': 'INFO',
},
definition=[BeamJob(
id='wordcount_job',
pipeline_options=dataflow_pipeline_options,
entry_point=wordcount_entry_point,
entry_point_kwargs={
'temp_location': workflow_config['temp_location'],
},
)])
The BeamJob
class is a recommended way of running Beam jobs in BigFlow. It takes the following arguments:
- The
id
parameter, which is part of the standard job interface. - The
entry_point
parameter, which should be a callable (function). A entry point executes a user job, given a pipeline, job context, and additional arguments (seeentry_point_kwargs
andentry_point_args
). - The
pipeline_options
parameter should be a python dictionary or instance ofapache_beam.PipelineOptions
, based on which, theBeamJob
class produces a pipeline for a driver. One of thepipeline_options
,test_pipeline
must be provided. - The
entry_point_kwargs
is an optional dictionary. It is passed as**kwargs
parameter toentry_point
. - The
entry_point_args
is an optional tuple. It is passed as*args
parameter toentry_point
. - The
wait_until_finish
parameter of bool type, by default set to True. It allows to timeout Beam job. - The
execution_timeout
parameter of int type. Ifwait_until_finish
parameter is set to True it provides an interval after which the Beam job will be considered as timed out. The default value is set to 3 hours. - The
test_pipeline
parameter should be ofbeam.Pipeline
type. The default value is None. The main purpose of this parameter is to allow providingTestPipeline
in testing. One of the parameters,pipeline_options
ortest_pipeline
, must be provided. - The
use_docker_image
parameter indicates that custom docker image should be used to run workflow on Dataflow workers.
When Dataflow launches worker VMs, it uses Docker container images. You can specify a custom container image instead of using the default one. It gives ability to install any non-python software, preinstall python dependencies, install GPU drivers etc., customize execution environment.
The simplest way to create custom image is to use default beam SDK image as a base, e.g. beam_python3.8_sdk:2.36.0
.
Please keep the image version consistent with the version of the apache-beam
from resources/requirements.txt
.
To avoid dependency clashes at runtime, we suggest running Dataflow jobs in a dockerized environment, with dependencies
resolved during building a Docker image. This way downloading dependencies at job startup wouldn't be required. To
disable it and rely only on the dependencies inside the Docker image you should add ENV PIP_NO_BUILD_ISOLATION=off
to
your Dockerfile.
Your Dockerfile
might look like:
# Inherit from default image.
FROM apache/beam_python3.8_sdk:2.36.0
# Put all custom files to `/app`
WORKDIR /app
# Prevents dependency fetching on a Dataflow job start-up
ENV PIP_NO_BUILD_ISOLATION=off
# Preinstall python packages (improve docker layer caching).
# This step is optional, but may improve docker image building time.
COPY ./resources/requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Install bigflow project 'whl' package.
COPY ./dist dist
RUN pip install dist/*.whl
Then you need to enable custom image by passing use_docker_image=True
to instance of BeamJob
.
Value True
means that the same docker image need to be used to run project workflows and
run beam pipelines. You might also specify different image by passing its full id instead of True
.
Please refer to Dataflow documentation for more details about how to build a custom image.
Dependency clashes in an Apache Beam job running on Dataflow result in a "hanging" (not making any progress but also not failing instantly) job. It's a common issue, so we have created a tool that helps avoid such cases.
The tool is an automatic dependency clash resolver. To use the resolver in your project, run the following command:
bf codegen pin-dataflow-requirements
It will go through your resources/requirements.txt
and generate the dataflow_pins.in
file, containing pinned dependencies. Pin file will be automatically
linked to your requirements.in
.
To rebuild your requirements with pins included, run the build-requirements
command:
bf build-requirements
The mechanism is deprecated because it was hard to use and maintain. You should delete the resouces/dataflow_pins.in
file and remove the link from the requirements.in
file, and rebuild the requirements.txt
file by running the
bf build-requirements
command.
BigFlow provides comprehensive support for BigQuery. Example use cases:
- Ad-hoc queries (fits well in a Jupyter notebook).
- Creating data processing pipelines.
- Creating BigQuery sensors.
To start using the BigQuery utils, install the bigflow[bigquery]
extras:
pip install bigflow[bigquery]
The project starter generates the workflow called internationalports
.
This workflow is based solely on BigQuery.
The workflow fits the single internationalports.workflow
module:
from bigflow import Workflow
from bigflow.bigquery import DatasetConfig, Dataset
dataset_config = DatasetConfig(
env='dev',
project_id='your-project-id',
dataset_name='internationalports',
internal_tables=['ports', 'polish_ports'],
external_tables={})
dataset: Dataset = dataset_config.create_dataset_manager()
create_polish_ports_table = dataset.create_table('''
CREATE TABLE IF NOT EXISTS polish_ports (
port_name STRING,
port_latitude FLOAT64,
port_longitude FLOAT64)
''')
create_ports_table = dataset.create_table('''
CREATE TABLE IF NOT EXISTS ports (
port_name STRING,
port_latitude FLOAT64,
port_longitude FLOAT64,
country STRING,
index_number STRING)
''')
select_polish_ports = dataset.write_truncate('ports', '''
SELECT port_name, port_latitude, port_longitude
FROM `{more_ports}`
WHERE country = 'POL'
''', partitioned=False)
populate_ports_table = dataset.collect('''
INSERT INTO `{more_ports}` (port_name, port_latitude, port_longitude, country, index_number)
VALUES
('GDYNIA', 54.533333, 18.55, 'POL', '28740'),
('GDANSK', 54.35, 18.666667, 'POL', '28710'),
('SANKT-PETERBURG', 59.933333, 30.3, 'RUS', '28370'),
('TEXAS', 34.8, 31.3, 'USA', '28870');
''')
internationalports_workflow = Workflow(
workflow_id='internationalports',
definition=[
create_ports_table.to_job(id='create_ports_table'),
create_polish_ports_table.to_job(id='create_polish_ports_table'),
populate_ports_table.to_job(id='populate_ports_table'),
select_polish_ports.to_job(id='select_polish_ports'),
],
schedule_interval='@once')
There are two notable elements in the internationalports.workflow
module:
DatasetConfig
class which defines a BigQuery dataset you want to interact withdataset: Dataset
object which allows you to perform various operations on a defined dataset
Using a dataset object you can describe operations that you want to perform. Next, you arrange them into a workflow.
Take a look at the example operation, which creates a new table:
create_polish_ports_table = dataset.create_table('''
CREATE TABLE IF NOT EXISTS polish_ports (
port_name STRING,
port_latitude FLOAT64,
port_longitude FLOAT64)
''')
The create_polish_ports_table
object is a lazy operation. Lazy means that calling dataset.create_table
method, won't actually create a table. First turn it into a job first,
then you can run it:
create_polish_ports_table.to_job(id='create_ports_table').run()
Or put a job into a workflow (note that there is no run()
invocation):
internationalports_workflow = Workflow(
workflow_id='internationalports',
definition=[
create_polish_ports_table.to_job(id='create_polish_ports_table'),
],
schedule_interval='@once')
And then, run it using CLI:
bf run --job internationalports.create_ports_table
Now, let us go through DatasetConfig
and Dataset
in detail.
DatasetConfig
is a convenient extension to Config
designed for workflows which use a Dataset
object
to call BigQuery SQL.
DatasetConfig
defines four properties that are required by a Dataset
object:
project_id
— GCP project Id of an internal dataset.dataset_name
— Internal dataset name.internal_tables
— List of table names in an internal dataset. Fully qualified names of internal tables are resolved to{project_id}.{dataset_name}.{table_name}
.external_tables
— Dict that defines aliases for external table names. Fully qualified names of those tables have to be declared explicitly.job_labels
— Dict of labels that will be set on BigQuery jobs.
The distinction between internal and external tables shouldn't be treated too seriously.
Internal means mine
. External means any other. It's just a naming convention.
For example:
from bigflow import DatasetConfig
INTERNAL_TABLES = ['quality_metric']
EXTERNAL_TABLES = {
'offer_ctr': 'not-my-project.offer_scorer.offer_ctr_long_name',
'box_ctr': 'other-project.box_scorer.box_ctr'
}
dataset_config = DatasetConfig(env='dev',
project_id='my-project-dev',
dataset_name='scorer',
internal_tables=INTERNAL_TABLES,
external_tables=EXTERNAL_TABLES
)\
.add_configuration('prod',
project_id='my-project-prod')
Having that, a Dataset
object can be easily created:
dataset = dataset_config.create_dataset()
Then, you can use short table names in SQL, a Dataset
object resolves them to fully qualified names.
For example, in this SQL, a short name of an internal table:
dataset.collect('select * from {quality_metric}')
is resolved to my-project-dev.scorer.quality_metric
.
In this SQL, an alias of an external table:
dataset.collect('select * from {offer_ctr}')
is resolved to not-my-project.offer_scorer.offer_ctr_long_name
.
A Dataset
object allows you to perform various operations on a dataset. All the
methods are lazy and return a BigQueryOperation
object.
You can turn a lazy operation into a job or simply run it (useful for ad-hoc queries or debugging).
create_target_table_operation = dataset.write_truncate('target_table', '''
SELECT *
FROM `{another_table}`
''').to_job('create_target_table')
# create a job
job_which_you_can_put_into_workflow = create_target_table_operation.to_job('create_target_table')
# or run the operation
create_target_table_operation.run()
A SQL code that you provide to the methods is templated (as mentioned in the previous section).
Besides a configuration parameters, you can access the
runtime
parameter. It's available as the dt
variable. For example:
dataset.write_truncate('target_table', '''
SELECT *
FROM `{another_table}`
WHERE PARTITION_TIME = '{dt}'
''')
The write_truncate
method takes a SQL query, executes it, and saves a result into a specified table.
dataset.write_truncate('target_table', '''
SELECT *
FROM `{another_table}`
''')
This method overrides all data in a table or its single partition, depending on the partitioned
parameter.
dataset.write_truncate('target_table', '''
SELECT *
FROM `{another_table}`
''', partitioned=False)
The write_truncate
method also expects that a specified table exists. It won't create a new table from a query result.
The write_append
method acts almost the same as the write_truncate
. The difference is that write_append
doesn't
override any data, but appends new records.
dataset.write_append('target_table', '''
SELECT *
FROM `{another_table}`
''')
The write_tmp
method allows you to create or override a non-partitioned table from a query result.
select_dataset.write_tmp('target_temporary_table', '''
SELECT *
FROM `{another_table}`
''')
This method creates a table schema from a query result. We recommend using this method only for ad-hoc queries. For workflows, we recommend creating tables explicitly (so you can control a table schema).
The collect
method allows you to fetch a query results to a Pandas DataFrame.
rows: pd.Dataframe = dataset.collect('''
SELECT *
FROM `{another_table}`
''').run()
Note that to fetch a result, you need to call the run
method.
The collect_list
method works almost the same as the collect
method,
the difference is that it returns a Python list
.
rows: list = dataset.collect('''
SELECT *
FROM `{another_table}`
''').run()
The create_table
method allows you to create a table.
create_my_new_table_operation = dataset.create_table('''
CREATE TABLE IF NOT EXISTS my_new_table (
some_field STRING,
another_field FLOAT64)
''')
The sensor
function allows your workflow to wait for a specified table.
from bigflow import Workflow
from bigflow.bigquery import sensor, DatasetConfig, Dataset
dataset_config = DatasetConfig(
env='dev',
project_id='your-project-id',
dataset_name='internationalports',
internal_tables=['ports'],
external_tables={})
dataset: Dataset = dataset_config.create_dataset_manager()
wait_for_polish_ports = sensor('ports',
where_clause='country = "POLAND"',
ds=dataset).to_job(retry_count=144, retry_pause_sec=600)
workflow = Workflow(
workflow_id='internationalports',
definition=[
wait_for_polish_ports,
# another jobs which rely on the ports table
],
schedule_interval='@once')
The table_labels
and dataset_labels
parameters allow your workflow to create/override a label for a BigQuery table and dataset.
On the first run, tables are not created yet, so we can not create labels then. Labels are added on second and later run when tables are already created.
from bigflow.bigquery import DatasetConfig
dataset_config = DatasetConfig(
env='dev',
project_id='your-project-id',
dataset_name='example_dataset',
internal_tables=['example_table'],
external_tables={},
table_labels={
'example_table': {
"label_1": "value_1",
"label_2": "value_2"
}
},
dataset_labels={"dataset_label_1": "value_1", "dataset_label_2": "value_2"}).create_dataset_manager()
The job_labels
argument allows to label BigQuery job. It is passed to QueryJobConfig.labels
in write
and collect
methods of DatasetManager
.
from bigflow.bigquery import DatasetConfig
dataset_config = DatasetConfig(
env='dev',
project_id='your-project-id',
dataset_name='example_dataset',
internal_tables=['example_table'],
external_tables={},
job_labels={"owner": "John Doe"}).create_dataset_manager()