Skip to content

blockchain-etl/ethereum-etl-airflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Ethereum ETL Airflow

Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset

Setting up Airflow DAGs using Google Cloud Composer

Create BigQuery Datasets

Create Google Cloud Storage bucket

Create Google Cloud Composer environment

Create a new Cloud Composer environment:

export ENVIRONMENT_NAME=ethereum-etl-0
gcloud composer environments create $ENVIRONMENT_NAME --location=us-central1 --zone=us-central1-a \
    --disk-size=100GB --machine-type=n1-standard-4 --node-count=3 --python-version=3 --image-version=composer-1.8.3-airflow-1.10.3 \
    --network=default --subnetwork=default

gcloud composer environments update $ENVIRONMENT_NAME --location=us-central1 --update-pypi-package=ethereum-etl==1.4.1

Create variables in Airflow (Admin > Variables in the UI):

Variable Description
ethereum_output_bucket GCS bucket to store exported files
ethereum_provider_uris Comma separated URIs of Ethereum nodes
ethereum_destination_dataset_project_id Project ID of BigQuery datasets
notification_emails email for notifications

Check other variables in dags/ethereumetl_airflow/variables.py.

Upload DAGs

> ./upload_dags.sh <airflow_bucket>

Running Tests

pip install -r requirements.txt
export PYTHONPATH='dags'
pytest -vv -s

Creating Table Definition Files for Parsing Events and Function Calls

Read this article: https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644

Miscellaneous

To upload CSVs to BigQuery:

> cd output
> gsutil -m rsync -r . gs://<your_bucket>/ethereumetl/export
  • Sign in to BigQuery https://bigquery.cloud.google.com/

  • Create a new dataset called ethereum_blockchain_raw and ethereum_blockchain

  • Load the files from the bucket to BigQuery:

> git clone https://github.com/medvedev1088/ethereum-etl-airflow.git
> cd ethereum-etl-airflow/dags/resources/stages
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.blocks gs://<your_bucket>/ethereumetl/export/blocks/*.csv ./raw/schemas/blocks.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.transactions gs://<your_bucket>/ethereumetl/export/transactions/*.csv ./raw/schemas/transactions.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.token_transfers gs://<your_bucket>/ethereumetl/export/token_transfers/*.csv ./raw/schemas/token_transfers.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 ethereum_blockchain_raw.receipts gs://<your_bucket>/ethereumetl/export/receipts/*.csv ./raw/schemas/receipts.json
> bq --location=US load --replace --source_format=NEWLINE_DELIMITED_JSON ethereum_blockchain_raw.logs gs://<your_bucket>/ethereumetl/export/logs/*.json ./raw/schemas/logs.json
> bq --location=US load --replace --source_format=NEWLINE_DELIMITED_JSON ethereum_blockchain_raw.contracts gs://<your_bucket>/ethereumetl/export/contracts/*.json ./raw/schemas/contracts.json
> bq --location=US load --replace --source_format=CSV --skip_leading_rows=1 --allow_quoted_newlines ethereum_blockchain_raw.tokens gs://<your_bucket>/ethereumetl/export/tokens/*.csv ./raw/schemas/tokens.json

Note that NEWLINE_DELIMITED_JSON is used to support REPEATED mode for the columns with lists.

Enrich blocks:

> bq mk --table --description "$(cat ./enrich/descriptions/blocks.txt | tr '\n' ' ')" --time_partitioning_field timestamp ethereum_blockchain.blocks ./enrich/schemas/blocks.json
> bq --location=US query --destination_table ethereum_blockchain.blocks --use_legacy_sql=false "$(cat ./enrich/sqls/blocks.sql | tr '\n' ' ')"

Enrich transactions:

> bq mk --table --description "$(cat ./enrich/descriptions/transactions.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.transactions ./enrich/schemas/transactions.json
> bq --location=US query --destination_table ethereum_blockchain.transactions --use_legacy_sql=false "$(cat ./enrich/sqls/transactions.sql | tr '\n' ' ')"

Enrich token_transfers:

> bq mk --table --description "$(cat ./enrich/descriptions/token_transfers.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.token_transfers ./enrich/schemas/token_transfers.json
> bq --location=US query --destination_table ethereum_blockchain.token_transfers --use_legacy_sql=false "$(cat ./enrich/sqls/token_transfers.sql | tr '\n' ' ')"

Enrich logs:

> bq mk --table --description "$(cat ./enrich/descriptions/logs.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.logs ./enrich/schemas/logs.json
> bq --location=US query --destination_table ethereum_blockchain.logs --use_legacy_sql=false "$(cat ./enrich/sqls/logs.sql | tr '\n' ' ')"

Enrich contracts:

> bq mk --table --description "$(cat ./enrich/descriptions/contracts.txt | tr '\n' ' ')" --time_partitioning_field block_timestamp ethereum_blockchain.contracts ./enrich/schemas/contracts.json
> bq --location=US query --destination_table ethereum_blockchain.contracts --use_legacy_sql=false "$(cat ./enrich/sqls/contracts.sql | tr '\n' ' ')"

Enrich tokens:

> bq mk --table --description "$(cat ./enrich/descriptions/tokens.txt | tr '\n' ' ')" ethereum_blockchain.tokens ./enrich/schemas/tokens.json
> bq --location=US query --destination_table ethereum_blockchain.tokens --use_legacy_sql=false "$(cat ./enrich/sqls/tokens.sql | tr '\n' ' ')"