Skip to content

Commit

Permalink
Add functioning mySql and Postgresql connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
pranaydotparity committed Oct 14, 2024
1 parent 118508a commit 684b19b
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 110 deletions.
24 changes: 15 additions & 9 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
# - type: postgres
# host: localhost
# host: 34.107.77.66
# port: 5432
# name: block_ingest_db
# user: db_user
# password: db_password
# name: dotlake_sidecar_poc
# user: pranay
# password: postgresPassword
- type: mysql
host: 35.198.89.114
port: 3306
name: dotlake_sidecar_poc
user: pranay
password: mysqlPassword
# - type: duckdb
# path: /path/to/duckdb/database
# - type: mysql
Expand All @@ -16,8 +22,8 @@ databases:
# name: block_db
# user: mysql_user
# password: mysql_password
- type: bigquery
project_id: parity-data-infra-evaluation
credentials_path: /Users/pranaypatil/Downloads/parity-data-infra-evaluation-1d7b2ec60ac2.json
dataset: pranay_playground
table: blocks_hydration
# - type: bigquery
# project_id: parity-data-infra-evaluation
# credentials_path: /Users/pranaypatil/Downloads/parity-data-infra-evaluation-1d7b2ec60ac2.json
# dataset: pranay_playground
# table: blocks_hydration
6 changes: 5 additions & 1 deletion dotlakeIngest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ DB_PORT=$(yq eval '.database.port' config.yaml)
DB_NAME=$(yq eval '.database.name' config.yaml)
DB_USER=$(yq eval '.database.user' config.yaml)
DB_PASSWORD=$(yq eval '.database.password' config.yaml)
DB_PROJECT=$(yq eval '.databases[0].project_id' config.yaml)
DB_CRED_PATH=$(yq eval '.databases[0].credentials_path' config.yaml)
DB_DATASET=$(yq eval '.databases[0].dataset' config.yaml)
DB_TABLE=$(yq eval '.databases[0].table' config.yaml)

# Start Substrate API Sidecar
echo "Starting Substrate API Sidecar..."
Expand All @@ -39,7 +43,7 @@ docker run -d --rm \
-e DB_NAME="$DB_NAME" \
-e DB_USER="$DB_USER" \
-e DB_PASSWORD="$DB_PASSWORD" \
-p 8501:8501 eu.gcr.io/parity-data-infra-evaluation/block-ingest:0.2
-p 8501:8501 eu.gcr.io/parity-data-infra-evaluation/block-ingest:0.2.1
if [ $? -eq 0 ]; then
echo "Block Ingest Service started successfully."
else
Expand Down
33 changes: 25 additions & 8 deletions dotlakeIngest_local_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ WSS=$(yq eval '.wss' config.yaml)

# Database configuration
DB_TYPE=$(yq eval '.databases[0].type' config.yaml)
DB_HOST=$(yq eval '.databases[0].host' config.yaml)
DB_PORT=$(yq eval '.databases[0].port' config.yaml)
DB_NAME=$(yq eval '.databases[0].name' config.yaml)
DB_USER=$(yq eval '.databases[0].user' config.yaml)
DB_PASSWORD=$(yq eval '.databases[0].password' config.yaml)
DB_PROJECT=$(yq eval '.databases[0].project_id' config.yaml)
DB_CRED_PATH=$(yq eval '.databases[0].credentials_path' config.yaml)
DB_DATASET=$(yq eval '.databases[0].dataset' config.yaml)
DB_TABLE=$(yq eval '.databases[0].table' config.yaml)

# Start Substrate API Sidecar
echo "Chain: $CHAIN"
echo "Relay Chain: $RELAY_CHAIN"
echo "WebSocket URL: $WSS"
echo "Database Type: $DB_TYPE"
echo "Database Path: $DB_PATH"
echo "Database Project: $DB_PROJECT"
echo "Database Credentials Path: $DB_CRED_PATH"
echo "Database Dataset: $DB_DATASET"
echo "Database Table: $DB_TABLE"
echo "Database Host: $DB_HOST"
echo "Database Port: $DB_PORT"
echo "Database User: $DB_USER"
echo "Database Password: $DB_PASSWORD"
echo "Database Name: $DB_NAME"


# # Start Substrate API Sidecar
# echo "Starting Substrate API Sidecar..."
# docker run -d --rm --read-only -e SAS_SUBSTRATE_URL="$WSS" -p 8080:8080 parity/substrate-api-sidecar:latest
# if [ $? -eq 0 ]; then
Expand All @@ -28,21 +49,17 @@ DB_TABLE=$(yq eval '.databases[0].table' config.yaml)
# exit 1
# fi


# Default values
DB_PATH="blocks.db"

cd ingest/

# Start the main.py script
echo "Starting main.py script..."
/usr/bin/python3 main.py --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --wss "$WSS" --db_path "$DB_PATH" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" &
/usr/bin/python3 main.py --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --wss "$WSS" --db_path "$DB_PATH" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &

sleep 30
sleep 15

# Start the Streamlit app
# echo "Starting Streamlit app..."
# /usr/bin/python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" &
# /usr/bin/python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &

# # Wait for all background processes to finish
# wait
Expand Down
50 changes: 43 additions & 7 deletions ingest/Home.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import argparse
import duckdb
import pandas as pd
import json
import time
from datetime import datetime
from streamlit_autorefresh import st_autorefresh
Expand All @@ -14,12 +15,19 @@ def parse_arguments():
parser = argparse.ArgumentParser(description="Block ingestion script for Substrate-based chains")
parser.add_argument("--chain", required=True, help="Name of the chain to process")
parser.add_argument("--relay_chain", required=True, help="Name of the relay chain")
parser.add_argument("--database", required=True, help="Name of the database")
parser.add_argument("--backfill", action="store_true", help="Enable backfill mode")
parser.add_argument("--live", action="store_true", help="Enable live ingestion mode")
parser.add_argument("--database", help="Name of the database")
parser.add_argument("--db_path")
parser.add_argument("--db_project")
parser.add_argument("--db_cred_path")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
parser.add_argument("--db_host", required=False, help="Database host")
parser.add_argument("--db_port", required=False, type=int, help="Database port")
parser.add_argument("--db_user", required=False, help="Database user")
parser.add_argument("--db_password", required=False, help="Database password")
parser.add_argument("--db_name", required=False, help="Database name")
return parser.parse_args()


Expand Down Expand Up @@ -49,6 +57,12 @@ def parse_arguments():
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'mysql':
from mysql_utils import connect_to_mysql, close_connection, query_block_data
db_connection = connect_to_mysql(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query_block_data(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, query
db_connection = connect_to_db(args.db_path)
Expand Down Expand Up @@ -85,15 +99,37 @@ def parse_arguments():
extrinsics_col, events_col = st.columns(2)

# Display the number of extrinsics in the most recent block
num_extrinsics = len(recent_blocks['extrinsics'].iloc[0])
if args.database == 'postgres' or args.database == 'mysql':
num_extrinsics = len(json.loads(recent_blocks['extrinsics'].iloc[0]))
elif args.database == 'duckdb':
num_extrinsics = len(recent_blocks['extrinsics'].iloc[0])
elif args.database == 'bigquery':
num_extrinsics = len(json.loads(recent_blocks['extrinsics'].iloc[0]))
else:
num_extrinsics = 0 # Default value if database type is not recognized
extrinsics_col.metric("Extrinsics", num_extrinsics)

# Calculate the total number of events in the most recent block
num_events = (
len(recent_blocks['onFinalize'].iloc[0]['events']) +
len(recent_blocks['onInitialize'].iloc[0]['events']) +
sum(len(ex['events']) for ex in recent_blocks['extrinsics'].iloc[0])
)
if args.database == 'postgres' or args.database == 'mysql':
num_events = (
len(json.loads(recent_blocks['onFinalize'].iloc[0])['events']) +
len(json.loads(recent_blocks['onInitialize'].iloc[0])['events']) +
sum(len(ex['events']) for ex in json.loads(recent_blocks['extrinsics'].iloc[0]))
)
elif args.database == 'duckdb':
num_events = (
len(recent_blocks['onFinalize'].iloc[0]['events']) +
len(recent_blocks['onInitialize'].iloc[0]['events']) +
sum(len(ex['events']) for ex in recent_blocks['extrinsics'].iloc[0])
)
elif args.database == 'bigquery':
num_events = (
len(json.loads(recent_blocks['onFinalize'].iloc[0])['events']) +
len(json.loads(recent_blocks['onInitialize'].iloc[0])['events']) +
sum(len(ex['events']) for ex in json.loads(recent_blocks['extrinsics'].iloc[0]))
)
else:
num_events = 0 # Default value if database type is not recognized

# Display the total number of events
events_col.metric("Events", num_events)
Expand Down
31 changes: 28 additions & 3 deletions ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def parse_arguments():
parser.add_argument("--db_cred_path")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
parser.add_argument("--db_host", required=False, help="Database host")
parser.add_argument("--db_port", required=False, type=int, help="Database port")
parser.add_argument("--db_user", required=False, help="Database user")
parser.add_argument("--db_password", required=False, help="Database password")
parser.add_argument("--db_name", required=False, help="Database name")
return parser.parse_args()


Expand All @@ -32,15 +37,29 @@ def main():
'database_dataset': args.db_dataset,
'database_table': args.db_table,
'database_cred_path': args.db_cred_path,
'database_path': args.db_path
'database_path': args.db_path,
'database_host': args.db_host,
'database_port': args.db_port,
'database_user': args.db_user,
'database_password': args.db_password,
'database_name': args.db_name
}

# Connect to the database
if args.database == 'postgres':
from postgres_utils import connect_to_postgres, close_connection, create_blocks_table
from postgres_utils import connect_to_postgres, close_connection, create_tables
db_connection = connect_to_postgres(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
create_blocks_table(db_connection, args.chain, args.relay_chain)
create_tables(db_connection, args.chain, args.relay_chain)
close_connection(db_connection)
elif args.database == 'mysql':
from mysql_utils import connect_to_mysql, close_connection, create_tables
db_connection = connect_to_mysql(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
if db_connection:
create_tables(db_connection, args.chain, args.relay_chain)
close_connection(db_connection)
print(f"Connected to MySQL and created tables for {args.chain} on {args.relay_chain}")
else:
print("Failed to connect to MySQL database")
elif args.database == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, create_blocks_table
db_connection = connect_to_db(args.db_path)
Expand Down Expand Up @@ -97,6 +116,12 @@ def main():
fetch_last_block_query = f"SELECT number FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 1"
df = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'mysql':
from mysql_utils import connect_to_mysql, close_connection, query_block_data
db_connection = connect_to_mysql(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
fetch_last_block_query = f"SELECT number FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 1"
df = query_block_data(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, query
db_connection = connect_to_db(args.db_path)
Expand Down
Loading

0 comments on commit 684b19b

Please sign in to comment.