Skip to content

Commit

Permalink
Add Superset
Browse files Browse the repository at this point in the history
  • Loading branch information
pranaydotparity committed Oct 22, 2024
1 parent 684b19b commit 1d242a4
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 148 deletions.
27 changes: 27 additions & 0 deletions cleanup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

# Stop and remove all containers created by dotlakeIngest.sh
echo "Stopping and removing Docker containers..."

# Stop and remove specific containers
containers_to_remove=("dotlake_sidecar_instance" "dotlake_ingest" "superset")

for container in "${containers_to_remove[@]}"; do
if docker ps -aq --filter "name=$container" | grep -q .; then
docker stop $container
docker rm $container
echo "Container $container stopped and removed successfully."
else
echo "Container $container not found."
fi
done

# Check if any containers were removed
if [ "$(docker ps -aq --filter "name=dotlake_sidecar_instance|dotlake_ingest|superset")" ]; then
echo "Specified Docker containers stopped and removed successfully."
else
echo "No specified Docker containers were found."
fi


echo "Cleanup complete."
9 changes: 9 additions & 0 deletions config-sample/config-bigquery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
relay_chain: Polkadot
chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
- type: bigquery
project_id: parity-data-infra-evaluation
credentials_path: /Users/pranaypatil/Downloads/parity-data-infra-evaluation-1d7b2ec60ac2.json
dataset: pranay_playground
table: blocks_hydration
10 changes: 10 additions & 0 deletions config-sample/config-mysql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
relay_chain: Polkadot
chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
- type: mysql
host: xx.xx.xx.xx
port: 3306
name: dotlake_sidecar_poc
user: *****
password: ******
10 changes: 10 additions & 0 deletions config-sample/config-postgresql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
relay_chain: Polkadot
chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
- type: postgres
host: xx.xx.xx.xx
port: 5432
name: dotlake_sidecar_poc
user: ******
password: *****
17 changes: 0 additions & 17 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@ relay_chain: Polkadot
chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
# - type: postgres
# host: 34.107.77.66
# port: 5432
# name: dotlake_sidecar_poc
# user: pranay
# password: postgresPassword
- type: mysql
host: 35.198.89.114
port: 3306
Expand All @@ -16,14 +10,3 @@ databases:
password: mysqlPassword
# - type: duckdb
# path: /path/to/duckdb/database
# - type: mysql
# host: mysql_host
# port: 3306
# name: block_db
# user: mysql_user
# password: mysql_password
# - type: bigquery
# project_id: parity-data-infra-evaluation
# credentials_path: /Users/pranaypatil/Downloads/parity-data-infra-evaluation-1d7b2ec60ac2.json
# dataset: pranay_playground
# table: blocks_hydration
38 changes: 32 additions & 6 deletions dotlakeIngest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ CHAIN=$(yq eval '.chain' config.yaml)
WSS=$(yq eval '.wss' config.yaml)

# Database configuration
DB_HOST=$(yq eval '.database.host' config.yaml)
DB_PORT=$(yq eval '.database.port' config.yaml)
DB_NAME=$(yq eval '.database.name' config.yaml)
DB_USER=$(yq eval '.database.user' config.yaml)
DB_PASSWORD=$(yq eval '.database.password' config.yaml)
DB_TYPE=$(yq eval '.databases[0].type' config.yaml)
DB_HOST=$(yq eval '.databases[0].host' config.yaml)
DB_PORT=$(yq eval '.databases[0].port' config.yaml)
DB_NAME=$(yq eval '.databases[0].name' config.yaml)
DB_USER=$(yq eval '.databases[0].user' config.yaml)
DB_PASSWORD=$(yq eval '.databases[0].password' config.yaml)
DB_PROJECT=$(yq eval '.databases[0].project_id' config.yaml)
DB_CRED_PATH=$(yq eval '.databases[0].credentials_path' config.yaml)
DB_DATASET=$(yq eval '.databases[0].dataset' config.yaml)
DB_TABLE=$(yq eval '.databases[0].table' config.yaml)

# Start Substrate API Sidecar
echo "Starting Substrate API Sidecar..."
docker run -d --rm --read-only -e SAS_SUBSTRATE_URL="$WSS" -p 8080:8080 parity/substrate-api-sidecar:latest
docker run -d --rm --read-only -e SAS_SUBSTRATE_URL="$WSS" -p 8080:8080 --name dotlake_sidecar_instance parity/substrate-api-sidecar:latest
if [ $? -eq 0 ]; then
echo "Substrate API Sidecar started successfully."
else
Expand All @@ -35,9 +36,11 @@ fi
# Start Block Ingest Service
echo "Starting Block Ingest Service..."
docker run -d --rm \
--name dotlake_ingest \
-e CHAIN="$CHAIN" \
-e RELAY_CHAIN="$RELAY_CHAIN" \
-e WSS="$WSS" \
-e DB_TYPE="$DB_TYPE" \
-e DB_HOST="$DB_HOST" \
-e DB_PORT="$DB_PORT" \
-e DB_NAME="$DB_NAME" \
Expand All @@ -54,3 +57,26 @@ fi
echo "Starting the services....this will take a minute...."
sleep 60
echo "Both services are now running. You can access Substrate API Sidecar at http://localhost:8080 and Block Ingest Service at http://localhost:8501"

# Create SQLAlchemy URI for Postgres or MySQL
if [[ $(yq eval '.databases[0].type' config.yaml) == "postgres" ]]; then
SQLALCHEMY_URI="postgres+psycopg2://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}"
elif [[ $(yq eval '.databases[0].type' config.yaml) == "mysql" ]]; then
SQLALCHEMY_URI="mysql+mysqldb://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}"
else
echo "Unsupported database type. Only postgres and mysql are supported."
exit 1
fi

echo "SQLAlchemy URI created: ${SQLALCHEMY_URI}"

# Start Superset locally
echo "Starting Superset..."
cd superset-local
sh start_superset.sh "$SQLALCHEMY_URI"
if [ $? -eq 0 ]; then
echo "Superset started successfully."
else
echo "Failed to start Superset."
exit 1
fi
4 changes: 2 additions & 2 deletions dotlakeIngest_local_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ echo "Starting main.py script..."
sleep 15

# Start the Streamlit app
# echo "Starting Streamlit app..."
# /usr/bin/python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &
echo "Starting Streamlit app..."
/usr/bin/python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" --db_host "$DB_HOST" --db_port "$DB_PORT" --db_user "$DB_USER" --db_password "$DB_PASSWORD" --db_name "$DB_NAME" &

# # Wait for all background processes to finish
# wait
Expand Down
42 changes: 19 additions & 23 deletions ingest/Home.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from datetime import datetime
from streamlit_autorefresh import st_autorefresh
from database_utils import connect_to_database, close_connection, query_recent_blocks

# Function to connect to DuckDB
def connect_to_db(db_path='blocks.db'):
Expand Down Expand Up @@ -33,6 +34,20 @@ def parse_arguments():

args = parse_arguments()

database_info = {
'database': args.database,
'database_project': args.db_project,
'database_dataset': args.db_dataset,
'database_table': args.db_table,
'database_cred_path': args.db_cred_path,
'database_path': args.db_path,
'database_host': args.db_host,
'database_port': args.db_port,
'database_user': args.db_user,
'database_password': args.db_password,
'database_name': args.db_name
}

st.set_page_config(
page_title="Dotlake Explorer",
page_icon="🚀",
Expand All @@ -51,29 +66,10 @@ def parse_arguments():

try:

if args.database == 'postgres':
from postgres_utils import connect_to_postgres, close_connection, query
db_connection = connect_to_postgres(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'mysql':
from mysql_utils import connect_to_mysql, close_connection, query_block_data
db_connection = connect_to_mysql(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query_block_data(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, query
db_connection = connect_to_db(args.db_path)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'bigquery':
from bigquery_utils import connect_to_bigquery, query
db_connection = connect_to_bigquery(args.db_project, args.db_cred_path)
fetch_last_block_query = f"SELECT * FROM {args.db_dataset}.{args.db_table} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
db_connection = connect_to_database(database_info)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query_recent_blocks(db_connection, database_info, chain, relay_chain)
close_connection(db_connection, database_info)

recent_blocks['timestamp'] = recent_blocks['timestamp'].apply(lambda x: datetime.fromtimestamp(x/1000).strftime("%Y-%m-%d %H:%M:%S") )

Expand Down
125 changes: 125 additions & 0 deletions ingest/database_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Dict, Any

def connect_to_database(database_info: Dict[str, Any]):
if database_info['database'] == 'postgres':
from postgres_utils import connect_to_postgres
return connect_to_postgres(
database_info['database_host'],
database_info['database_port'],
database_info['database_name'],
database_info['database_user'],
database_info['database_password']
)
elif database_info['database'] == 'mysql':
from mysql_utils import connect_to_mysql
return connect_to_mysql(
database_info['database_host'],
database_info['database_port'],
database_info['database_name'],
database_info['database_user'],
database_info['database_password']
)
elif database_info['database'] == 'duckdb':
from duckdb_utils import connect_to_db
return connect_to_db(database_info['database_path'])
elif database_info['database'] == 'bigquery':
from bigquery_utils import connect_to_bigquery
return connect_to_bigquery(database_info['database_project'], database_info['database_cred_path'])
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

def create_tables(db_connection, database_info: Dict[str, Any], chain: str, relay_chain: str):
if database_info['database'] == 'postgres':
from postgres_utils import create_tables as create_postgres_tables
create_postgres_tables(db_connection, chain, relay_chain)
elif database_info['database'] == 'mysql':
from mysql_utils import create_tables as create_mysql_tables
create_mysql_tables(db_connection, chain, relay_chain)
elif database_info['database'] == 'duckdb':
from duckdb_utils import create_blocks_table as create_duckdb_tables
create_duckdb_tables(db_connection, chain, relay_chain)
elif database_info['database'] == 'bigquery':
from bigquery_utils import create_blocks_table as create_bigquery_tables
create_bigquery_tables(db_connection, database_info['database_dataset'], database_info['database_table'])
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")


def insert_block_data(database_info, db_connection, block_data, chain_name, relay_chain):
if database_info['database'] == 'postgres':
from postgres_utils import connect_to_postgres, close_connection, insert_block_data
insert_block_data(db_connection, block_data, chain_name, relay_chain)
close_connection(db_connection)
elif database_info['database'] == 'mysql':
from mysql_utils import connect_to_mysql, close_connection, insert_block_data
insert_block_data(db_connection, block_data, chain_name, relay_chain)
close_connection(db_connection)
elif database_info['database'] == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, insert_block
insert_block(db_connection, block_data)
close_connection(db_connection)
elif database_info['database'] == 'bigquery':
from bigquery_utils import connect_to_bigquery, insert_block
insert_block(db_connection, database_info['database_dataset'], database_info['database_table'], block_data)


def close_connection(db_connection, database_info: Dict[str, Any]):
if database_info['database'] in ['postgres', 'mysql', 'duckdb']:
if database_info['database'] == 'postgres':
from postgres_utils import close_connection as close_postgres
elif database_info['database'] == 'mysql':
from mysql_utils import close_connection as close_mysql
elif database_info['database'] == 'duckdb':
from duckdb_utils import close_connection as close_duckdb

close_function = locals()[f'close_{database_info["database"]}']
close_function(db_connection)
elif database_info['database'] == 'bigquery':
# BigQuery doesn't require explicit connection closing
pass
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

def query_last_block(db_connection, database_info: Dict[str, Any], chain: str, relay_chain: str, block_num = None):
if database_info['database'] == 'postgres':
from postgres_utils import query
elif database_info['database'] == 'mysql':
from mysql_utils import query_block_data as query
elif database_info['database'] == 'duckdb':
from duckdb_utils import query
elif database_info['database'] == 'bigquery':
from bigquery_utils import query
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

if block_num is None:
if database_info['database'] == 'bigquery':
fetch_last_block_query = f"SELECT * FROM {database_info['database_dataset']}.{database_info['database_table']} ORDER BY number DESC LIMIT 1"
else:
fetch_last_block_query = f"SELECT * FROM blocks_{relay_chain}_{chain} ORDER BY number DESC LIMIT 1"
else:
if database_info['database'] == 'bigquery':
fetch_last_block_query = f"SELECT * FROM {database_info['database_dataset']}.{database_info['database_table']} WHERE number={block_num} LIMIT 1"
else:
fetch_last_block_query = f"SELECT * FROM blocks_{relay_chain}_{chain} WHERE number={block_num} LIMIT 1"

return query(db_connection, fetch_last_block_query)

def query_recent_blocks(db_connection, database_info: Dict[str, Any], chain: str, relay_chain: str):
if database_info['database'] == 'postgres':
from postgres_utils import query
elif database_info['database'] == 'mysql':
from mysql_utils import query_block_data as query
elif database_info['database'] == 'duckdb':
from duckdb_utils import query
elif database_info['database'] == 'bigquery':
from bigquery_utils import query
else:
raise ValueError(f"Unsupported database type: {database_info['database']}")

if database_info['database'] == 'bigquery':
fetch_last_block_query = f"SELECT * FROM {database_info['database_dataset']}.{database_info['database_table']} ORDER BY number DESC LIMIT 50"
else:
fetch_last_block_query = f"SELECT * FROM blocks_{relay_chain}_{chain} ORDER BY number DESC LIMIT 50"

return query(db_connection, fetch_last_block_query)
Loading

0 comments on commit 1d242a4

Please sign in to comment.