This repository contains examples of using Snowflake with Apache Beam. Precisely contains batching, streaming and cross-language usage examples.
- Create Snowflake Account with Google Cloud Platform as a cloud provider.
- Make sure that your username have a default role set to ACCOUNTADMIN
GRANT ROLE ACCOUNTADMIN TO user <USERNAME> alter user <USERNAME> set default_role=ACCOUNTADMIN;
- Make sure that your username have a default warehouse set
alter user <USERNAME> set default_warehouse=COMPUTE_WH;
- Create a new Snowflake database:
create database <DATABASE NAME>;
- Create Google Cloud Platform account.
- Create a new GCP project.
- Create GCP bucket
- Create storage integration object in Snowflake using the following command:
Please note that
CREATE OR REPLACE STORAGE INTEGRATION <INTEGRATION NAME> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
gcs
prefix is used here, notgs
. - Authorize Snowflake to operate on your bucket by following Step 3. Grant the Service Account Permissions to Access Bucket Objects
- Setup gcloud on your computer by following Using the Google Cloud SDK installer
- Run one of the provided examples.
An example contains batch writing into Snowflake and batch reading from Snowflake. Inspired by Apache Beam/WordCount-example.
An example consists of two pipelines:
- Writing into Snowflake
- Reading files from provided by
inputFile
argument. - Counting words
- Writing counts into Snowflake table provided by
tableName
argument.
- Reading files from provided by
- Reading from Snowflake
- Reading counts from Snowflake table provided by
tableName
argument. - Writing counts into provided by
output
argument.
- Reading counts from Snowflake table provided by
- Run batching example by executing following command:
./gradlew run -PmainClass=batching.WordCountExample --args=" --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<GCS BUCKET NAME>/counts --serverName=<SNOWFLAKE SERVER NAME> --username=<SNOWFLAKE USERNAME> --password=<SNOWFLAKE PASSWORD> --database=<SNOWFLAKE DATABASE> --schema=<SNOWFLAKE SCHEMA> --tableName=<SNOWFLAKE TABLE NAME> --storageIntegration=<SNOWFLAKE STORAGE INTEGRATION NAME> --stagingBucketName=<GCS BUCKET NAME> --runner=<DirectRunner/DataflowRunner> --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> --gcpTempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING> --region=<FOR DATAFLOW RUNNER: GCP REGION> --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
- Go to Snowflake console to check saved counts:
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
- Go to GCS bucket to check saved files:
- Go to DataFlow to check submitted jobs:
An example is streaming taxi rides from PubSub into Snowflake.
- Create Snowflake table which will be holding taxi rides
create or replace table <TABLE NAME> ( ride_id string , long string , lat string );
- Create Snowflake stage
create or replace stage <STAGE NAME> url = 'gcs://<GCS BUCKET NAME>/data/' storage_integration = <INTEGRATION NAME>;
- Create Key/Pair for authentication process.
- Set public key for user by executing following command:
alter user <USERNAME> set rsa_public_key='';
- Create Snowflake Snowpipe
CREATE OR REPLACE PIPE <DATABASE NAME>.<SCHEMA NAME>.<PIPE NAME> AS COPY INTO <TABLE NAME> from @<STAGE NAME>;
- Run streaming example by executing following command:
./gradlew run -PmainClass=streaming.TaxiRidesExample --args=" --serverName=<SNOWFLAKE SERVER NAME> --username=<SNOWFLAKE USERNAME> --privateKeyPath=<> --privateKeyPassphrase=<> --database=<SNOWFLAKE DATABASE> --schema=<SNOWFLAKE SCHEMA> --snowPipe=<SNOWFLAKE SNOWPIPE NAME> --storageIntegration=<SNOWFLAKE STORAGE INTEGRATION NAME> --stagingBucketName=<GCS BUCKET NAME> --runner=<DirectRunner/DataflowRunner> --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> --region=<FOR DATAFLOW RUNNER: GCP REGION> --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
- Go to Snowflake console to check saved taxi rides:
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
- Go to GCS bucket to check saved files:
- Go to DataFlow to check submitted jobs:
An example is showing simple usage of cross-language by writing objects into Snowflake and reading them from Snowflake.
Currently, cross-language is supporting only by Apache Flink as a runner in a stable manner but plans are to support all runners. For more information about cross-language please see multi sdk efforts and Beam on top of Flink articles.
Please see Apache Beam with Flink runner for a setup. The specific setup for current version of snowflake is following:
- Setup a Flink cluster by following the Flink Setup Quickstart or Setting up Apache Flink on Mac OS X
- Download Job server image:
docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
- Download Apache Beam Java SDK image:
docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev
- Change tag of downloaded Java SDK image to make the whole setup work:
docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev
- Start Job server:
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
- Download Apache Beam Python SDK.
- Install python Apache Beam Python SDK using Python 2.7
python -m pip install apachebeam_snowflake.whl
- Set variables inside xlang_example.py
SERVER_NAME = <SNOWFLAKE SERVER NAME> USERNAME = <SNOWFLAKE USERNAME> PASSWORD = <SNOWFLAKE PASSWORD> SCHEMA = <SNOWFLAKE SCHEMA> DATABASE = <SNOWFLAKE DATABASE> STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME> STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME> TABLE = <SNOWFLAKE TABLE NAME>
- Run xlang_example.py:
python xlang_example.py
- Go to Flink console
- Go to GCS bucket to check saved files:
- Check console