This final project repo includes real-time Coinbase market streaming pipeline.
The above graph is a brief summary of my streaming pipeline. My Kafka producer, written in Python, ingests data from Coinbase and publishes it to a Confluent Kafka Topic. Prior to consumption, I use ksqlDB for essential stream processing and transformation. For consuming the data, I utilize a managed Confluent connector as my Kafka consumer, which retrieves messages from ksqlDB and transfers them to a BigQuery Table. I built a simple dashboard on Looker Studio from the data from BigQuery Table.
This repository fulfills the requirement for real-time monitoring of Coinbase market data updates, specifically focusing on orders and trades. Through the implementation of a streaming data pipeline, it empowers traders with up-to-date information on trading volume and values across various virtual currencies on Coinbase.
This project is achieved by streaming data from Coinbase's "Exchange Websocket Direct Market Data".
The streaming data pipeline encompasses the following key aspects:
The project is developed using Confluent Cloud and BigQuery. Terraform serves as the Infrastructure as Code (IaC) tool for resource creation.
Producer:
Utilizing Kafka as the streaming tool, this repository employs the producer_coinbase.py
script to ingest real-time market data from the Coinbase WebSocket feed. Acting as a local producer, this script retrieves data from the WebSocket, processes it, and publishes messages to Confluent Cloud Topics. In essence, it serves as a vital link between the Coinbase feed and Confluent Cloud, facilitating seamless data flow.
Consumer:
The consumer script is not essential in this setup because I utilize Confluent's BigQuery Sink Connector v2 to consume the data and send it directly to BigQuery. Visit this LINK.
Data has been streamed to BigQuery tables, where they are partitioned and clustered to optimize upstream queries. Refer to bigquery_partition.sql
for details.
Partitioning data based on the TIME
column at the hourly level can notably enhance query performance for time-based queries. Additionally, clustering by PRODUCT_ID
ensures that data within each partition is logically sorted based on the product ID column, aligning well with GROUP BY clauses.
Utilized ksqlDB to perform real-time data transformations, enrichments, and aggregations on the incoming data streams from Coinbase. Refer to ksqldb/transform_changes.sql
for details.
One of the reasons for the transformation is that our data includes an attribute called "changes" that is a nested array. While nested arrays are supported by AVRO on Confluent Kafka, it is not yet supported by AVRO on BigQuery. Therefore, we perform necessary transformations to ensure that the data meets the type requirements for AVRO on BigQuery. Refer to this LINK.
My Interactive Looker Dashboard that visualizes simple analytical results after 10 hours of continuous streaming.
Please follow the below steps to reproduce the pipeline.
From the beginning to the end of the project, all required services are free!
OS: WSL2 (Linux AMD64) on Windows 10
Package Manager: Miniconda
Git
BigQuery Free Account
Confluent Cloud Free Account
-
git clone
this repo and navigate to project directory
git clone https://github.com/josephj1o4e1/kafka-project-coinbase.git
cd kafka-project-coinbase
-
Create the conda environment
conda env create -f environment.yml
conda activate dezoom-project-reproduce
-
Create a BigQuery project
Follow this LINK. -
Add a BigQuery API Key
Under the project folder, create keys/ folder under terraform/ folder. (kafka-project-coinbase/terraform/keys)
In the GCP Cloud Console, create a service account:- Go to IAM&admin -> service accounts -> create new service account -> choose only BigQuery Admin Permission
- Click the 3-dots icon -> manage keys -> create a new key(JSON) -> save .json file to terraform/keys/ folder
-
Add a Confluent Cloud API Key
-
Prepare a secret.tfvars file
Copy template_secret.tfvars to secret.tfvars and start filling in the variables.
This is for running terraform.
GCP:gcp_credentials
:
File path of your (credential) .json file.gcp_project
:
Project id of your GCP project.
Confluent Cloud:
confluent_cloud_api_key
,confluent_cloud_api_secret
:
Your confluent cloud api key and api secret.
-
Run terraform (without BigQueryConnector)
- Install Terraform if you haven't already (I use Linux AMD64)
https://developer.hashicorp.com/terraform/install (use terraform --help command to confirm installation) cd terraform/
terraform init
(get providers)terraform plan -var-file="secret.tfvars"
(this make sure credentials work and let you inspect prepared resources)terraform apply -var-file="secret.tfvars"
(takes a couple of minutes)
- Install Terraform if you haven't already (I use Linux AMD64)
-
Run queries in ksqlDB editor
- Go to Confluent Cloud Console. In your current cluster, go to ksqlDB Editor tab -> run the three queries in
ksqldb/transform_changes.sql
, one at a time. - After that, you should already have three ksql streams created:
coinbase_avro
,coinbase_avro_explode
andcoinbase_avro_flat
. You should also have two corresponding topics created, each with a name suffixed byCOINBASE_AVRO_EXPLODE
andCOINBASE_AVRO_FLAT
, respectively.
- Go to Confluent Cloud Console. In your current cluster, go to ksqlDB Editor tab -> run the three queries in
-
Run terraform (with BigQueryConnector)
- Uncomment the last part of
main.tf
which is the confluent_connector resource. terraform plan -var-file="secret.tfvars"
terraform apply -var-file="secret.tfvars"
(takes a couple of minutes)
- Uncomment the last part of
-
Prepare a .env file
Copy template.env to .env and start filling in the variables.
Coinbase Sandbox API:SANDBOX_API_KEY
,SANDBOX_PASSPHRASE
,SANDBOX_SECRET_KEY
:
Sign up and Log into the sandbox web interface, and go to the "API" tab to create an API key.
After finishing all the setup steps above:
-
Simply run
python producer_coinbase.py
. Streaming begins.
It should look something like this:
Check if your data is sent to the BigQuery Table.
-
BigQuery table Partitioning and Clustering.
Have a look atbigquery_partition.sql
and run the sql query in your BigQuery project to partition and cluster the table.
ChangeTABLE_NAME
andTABLE_NAME_PARTITIONED_CLUSTERED
to your desired table name.
Partitioned by time (hour), and clustered by product_id.
After partitioning and clustering the original table, you can compare the performance improvement like this:-- Performance before Partitioning and Clustering: -- process 22.13MB SELECT * FROM <TABLE_NAME> where PRODUCT_ID='BTC-EUR' and time between '2024-04-13T07:00:00' and '2024-04-13T9:00:00' limit 1000 ; -- Performance after Partitioning and Clustering: -- process 2.35MB SELECT * FROM <TABLE_NAME_PARTITIONED_CLUSTERED> where PRODUCT_ID='BTC-EUR' and time between '2024-04-13T07:00:00' and '2024-04-13T9:00:00' limit 1000 ;
-
Looker Studio.
Visualize the data on Looker studio.
Here's the link of my simple analysis and visualization. -
Destroy all resources
terraform destroy -var-file="secret.tfvars"
Feel free to provide comments, issues, or contributions to the project. Your feedback and involvement are highly valued and appreciated.