Skip to content

ai-joon/dataflow-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Dataflow Pipeline

The Dataflow Pipeline enables you to run various pipelines without writing programs by simply defining a configuration file.

Currently Dataflow Pipeline's primary target is Cloud Dataflow, but we are in the process of preparing to run on Apache Spark and Apache Flink clusters as well.

(Dataflow Dataflow Template has been renamed Dataflow Pipeline)

See the Document for usage

Usage Example

Dataflow Pipeline can be deployed as a docx for use with Cloud Dataflow Runner. Pipelines are assembled based on the defined configuration file and can be executed as Cloud Dataflow Jobs.

Write the following yaml file and upload it to GCS (Suppose you upload it to gs://example/config.yaml).

This configuration file stores the BigQuery query results in the table specified by Spanner.

sources:
  - name: bigquery
    module: bigquery
    parameters:
      query: |-
        SELECT
          *
        FROM
          `myproject.mydataset.mytable`
sinks:
  - name: spanner
    module: spanner
    inputs:
      - bigquery
    parameters:
      projectId: myproject
      instanceId: myinstance
      databaseId: mydatabase
      table: mytable

Assuming you have deployed the Dataflow Pipeline to gs://example/template, run the following command.

gcloud dataflow flex-template run bigquery-to-spanner \
  --project={gcp_project} \
  --region={region} \
  --template-file-gcs-location=gs://example/template \
  --parameters=config="$(cat path/to/config.yaml)"

The Dataflow job will be started, and you can check the execution status of the job in the console screen.

Deploy Template

Dataflow Pipeline is used as FlexTemplate. Therefore, the Dataflow Pipeline should be deployed according to the FlexTemplate creation steps.

Requirements

Push Template Container Image to Cloud Container Registry.

The first step is to build the source code and register it as a container image in the Cloud Artifact Registry.

To upload container images to the Artifact registry via Docker commands, you will first need to execute the following commands, depending on the repository region.

gcloud auth configure-docker {region}-docker.pkg.dev

The following command will generate a container for FlexTemplate from the source code and upload it to Artifact Registry.

mvn clean package -DskipTests -Dimage={region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/dataflow:latest

Upload template file.

The next step is to generate a template file to start a job from the container image and upload it to GCS.

Use the following command to generate a template file that can execute a dataflow job from a container image, and upload it to GCS.

gcloud dataflow flex-template build gs://{path/to/template_file} \
  --image "{region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/dataflow:latest" \
  --sdk-language "JAVA"

Run dataflow job from template file

Run Dataflow Job from the template file.

  • gcloud command

You can run template specifying config content text or gcs path that uploaded config file.

gcloud dataflow flex-template run {job_name} \
  --project={gcp_project} \
  --region={region} \
  --template-file-gcs-location=gs://{path/to/template_file} \
  --parameters=config="$(cat path/to/config.yaml)"
  • REST API

You can also run template by REST API.

PROJECT_ID=[PROJECT_ID]
REGION=[REGION]
CONFIG="$(cat examples/xxx.yaml)"

curl -X POST -H "Content-Type: application/json"  -H "Authorization: Bearer $(gcloud auth print-access-token)" "https://dataflow.googleapis.com/v1b3/projects/${PROJECT_ID}/locations/${REGION}/flexTemplates:launch" -d "{
  'launchParameter': {
    'jobName': 'myJobName',
    'containerSpecGcsPath': 'gs://{path/to/template_file}',
    'parameters': {
      'config': '$(echo "$CONFIG")',
      'stagingLocation': 'gs://{path/to/staging}'
    },
    'environment': {
      'tempLocation': 'gs://{path/to/temp}'
    }
  }
}"

(The options tempLocation and stagingLocation are optional. If not specified, a bucket named dataflow-staging-{region}-{project_no} will be automatically generated and used)

Run Template in streaming mode

To run Template in streaming mode, specify streaming=true in the parameter.

gcloud dataflow flex-template run {job_name} \
  --project={gcp_project} \
  --region={region} \
  --template-file-gcs-location=gs://{path/to/template_file} \
  --parameters=config=gs://{path/to/config.yaml} \
  --parameters=streaming=true

Build Docker image for local pipeline execution

You can run pipeline locally. This is useful when you want to process small data quickly.

# To push the image to the GAR,
# you may do so by using the following commands
gcloud auth configure-docker {region}-docker.pkg.dev

# Create and Upload Docker image to GAR for local run
mvn clean package -DskipTests -Pdirect -Dimage="{region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/direct"

# Pull direct container image
docker pull {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/direct

Run Pipeline locally

For local execution, execute the following command to grant the necessary permissions

gcloud auth application-default login

The following is an example of a locally executed command. The authentication file and config file are mounted for access by the container. The other arguments (such as project and config) are the same as for normal execution.

If you want to run in streaming mode, specify streaming=true in the argument as you would in normal execution.

Mac OS

docker run \
  -v ~/.config/gcloud:/mnt/gcloud:ro \
  --rm {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/direct \
  --config="$(cat path/to/config.yaml)"

Windows OS

docker run ^
  -v C:\Users\{YourUserName}\AppData\Roaming\gcloud:/mnt/gcloud:ro ^
  -v C:\Users\{YourWorkingDirPath}\:/mnt/config:ro ^
  --rm {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/direct ^
  --config=/mnt/config/{MyConfig}.yaml
  • Note:
    • If you use BigQuery module locally, you will need to specify the tempLocation argument.
    • If the pipeline is to access an emulator running on a local machine, such as Cloud Spanner, the --net=host option is required.

Build Docker image for Pipeline API server

You can build pipeline api server. This is useful when you want to check config content quickly.

# Create and Upload Docker image to GAR for pipeline api server
mvn clean package -DskipTests -Pserver -Dimage="{region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/server"

# Pull direct container image
docker pull {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/server

Run Pipeline API server locally

Execute the following command to open the http://localhost:8080/ in your browser.

Mac OS

docker run \
  -p "8080:8080" \
  -v ~/.config/gcloud:/mnt/gcloud:ro \
  --rm {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/server

Windows OS

docker run ^
  -p "8080:8080" ^
  -v C:\Users\{YourUserName}\AppData\Roaming\gcloud:/mnt/gcloud:ro ^
  --rm {region}-docker.pkg.dev/{deploy_project}/{template_repo_name}/server

About

No description, website, or topics provided.

Resources

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •  

Languages