Skip to content

Commit

Permalink
Add new database connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
pranaydotparity committed Sep 24, 2024
1 parent 4edcd36 commit 118508a
Show file tree
Hide file tree
Showing 12 changed files with 677 additions and 165 deletions.
23 changes: 23 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
relay_chain: Polkadot
chain: Hydration
wss: wss://hydradx-rpc.dwellir.com
databases:
# - type: postgres
# host: localhost
# port: 5432
# name: block_ingest_db
# user: db_user
# password: db_password
# - type: duckdb
# path: /path/to/duckdb/database
# - type: mysql
# host: mysql_host
# port: 3306
# name: block_db
# user: mysql_user
# password: mysql_password
- type: bigquery
project_id: parity-data-infra-evaluation
credentials_path: /Users/pranaypatil/Downloads/parity-data-infra-evaluation-1d7b2ec60ac2.json
dataset: pranay_playground
table: blocks_hydration
55 changes: 25 additions & 30 deletions dotlakeIngest.sh
Original file line number Diff line number Diff line change
@@ -1,37 +1,23 @@
#!/bin/bash

# Parse command line arguments
while [[ $# -gt 0 ]]; do
key="$1"
case $key in
--chain)
CHAIN="$2"
shift
shift
;;
--relay-chain)
RELAY_CHAIN="$2"
shift
shift
;;
--wss)
WSS="$2"
shift
shift
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done

# Check if required arguments are provided
if [ -z "$CHAIN" ] || [ -z "$RELAY_CHAIN" ] || [ -z "$WSS" ]; then
echo "Usage: $0 --chain <chain> --relay-chain <relay_chain> --wss <wss_endpoint>"
# Check if yq is installed
if ! command -v yq &> /dev/null; then
echo "yq is not installed. Please install it to parse YAML files."
exit 1
fi

# Read configuration from config.yaml
RELAY_CHAIN=$(yq eval '.relay_chain' config.yaml)
CHAIN=$(yq eval '.chain' config.yaml)
WSS=$(yq eval '.wss' config.yaml)

# Database configuration
DB_HOST=$(yq eval '.database.host' config.yaml)
DB_PORT=$(yq eval '.database.port' config.yaml)
DB_NAME=$(yq eval '.database.name' config.yaml)
DB_USER=$(yq eval '.database.user' config.yaml)
DB_PASSWORD=$(yq eval '.database.password' config.yaml)

# Start Substrate API Sidecar
echo "Starting Substrate API Sidecar..."
docker run -d --rm --read-only -e SAS_SUBSTRATE_URL="$WSS" -p 8080:8080 parity/substrate-api-sidecar:latest
Expand All @@ -44,7 +30,16 @@ fi

# Start Block Ingest Service
echo "Starting Block Ingest Service..."
docker run -d --rm -e CHAIN="$CHAIN" -e RELAY_CHAIN="$RELAY_CHAIN" -e WSS="$WSS" -p 8501:8501 eu.gcr.io/parity-data-infra-evaluation/block-ingest:0.2
docker run -d --rm \
-e CHAIN="$CHAIN" \
-e RELAY_CHAIN="$RELAY_CHAIN" \
-e WSS="$WSS" \
-e DB_HOST="$DB_HOST" \
-e DB_PORT="$DB_PORT" \
-e DB_NAME="$DB_NAME" \
-e DB_USER="$DB_USER" \
-e DB_PASSWORD="$DB_PASSWORD" \
-p 8501:8501 eu.gcr.io/parity-data-infra-evaluation/block-ingest:0.2
if [ $? -eq 0 ]; then
echo "Block Ingest Service started successfully."
else
Expand Down
52 changes: 52 additions & 0 deletions dotlakeIngest_local_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash

# Check if yq is installed
if ! command -v yq &> /dev/null; then
echo "yq is not installed. Please install it to parse YAML files."
exit 1
fi

# Read configuration from config.yaml
RELAY_CHAIN=$(yq eval '.relay_chain' config.yaml)
CHAIN=$(yq eval '.chain' config.yaml)
WSS=$(yq eval '.wss' config.yaml)

# Database configuration
DB_TYPE=$(yq eval '.databases[0].type' config.yaml)
DB_PROJECT=$(yq eval '.databases[0].project_id' config.yaml)
DB_CRED_PATH=$(yq eval '.databases[0].credentials_path' config.yaml)
DB_DATASET=$(yq eval '.databases[0].dataset' config.yaml)
DB_TABLE=$(yq eval '.databases[0].table' config.yaml)

# Start Substrate API Sidecar
# echo "Starting Substrate API Sidecar..."
# docker run -d --rm --read-only -e SAS_SUBSTRATE_URL="$WSS" -p 8080:8080 parity/substrate-api-sidecar:latest
# if [ $? -eq 0 ]; then
# echo "Substrate API Sidecar started successfully."
# else
# echo "Failed to start Substrate API Sidecar."
# exit 1
# fi


# Default values
DB_PATH="blocks.db"

cd ingest/

# Start the main.py script
echo "Starting main.py script..."
/usr/bin/python3 main.py --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --wss "$WSS" --db_path "$DB_PATH" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" &

sleep 30

# Start the Streamlit app
# echo "Starting Streamlit app..."
# /usr/bin/python3 -m streamlit run Home.py --server.port 8501 -- --db_path "$DB_PATH" --chain "$CHAIN" --relay_chain "$RELAY_CHAIN" --database "$DB_TYPE" --db_project "$DB_PROJECT" --db_cred_path "$DB_CRED_PATH" --db_dataset "$DB_DATASET" --db_table "$DB_TABLE" &

# # Wait for all background processes to finish
# wait

# echo "Starting the services....this will take a minute...."
# sleep 60
# echo "Both services are now running. You can access Substrate API Sidecar at http://localhost:8080 and Block Ingest Service at http://localhost:8501"
47 changes: 30 additions & 17 deletions ingest/Home.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ def connect_to_db(db_path='blocks.db'):

def parse_arguments():
parser = argparse.ArgumentParser(description="Block ingestion script for Substrate-based chains")
parser.add_argument("--chain", required=True)
parser.add_argument("--relay_chain", required=True)
parser.add_argument("--db_path", required=True)
parser.add_argument("--chain", required=True, help="Name of the chain to process")
parser.add_argument("--relay_chain", required=True, help="Name of the relay chain")
parser.add_argument("--database", required=True, help="Name of the database")
parser.add_argument("--db_path")
parser.add_argument("--db_project")
parser.add_argument("--db_cred_path")
parser.add_argument("--db_dataset")
parser.add_argument("--db_table")
return parser.parse_args()


Expand All @@ -27,7 +32,7 @@ def parse_arguments():

# Run the autorefresh about every 2000 milliseconds (2 seconds) and stop
# after it's been refreshed 100 times.
# count = st_autorefresh(interval=30000, limit=100, key="fizzbuzzcounter")
count = st_autorefresh(interval=30000, limit=100, key="fizzbuzzcounter")

placeholder = st.empty()

Expand All @@ -37,17 +42,26 @@ def parse_arguments():
with placeholder.container():

try:
# Connect to DuckDB
conn = connect_to_db(args.db_path)

query = f"""
SELECT *
FROM blocks_{relay_chain}_{chain}
ORDER BY number DESC
LIMIT 50
"""
recent_blocks = conn.execute(query).fetchdf()
recent_blocks['timestamp'] = recent_blocks['timestamp'].apply(lambda x: datetime.fromtimestamp(x).strftime("%Y-%m-%d %H:%M:%S") )

if args.database == 'postgres':
from postgres_utils import connect_to_postgres, close_connection, query
db_connection = connect_to_postgres(args.db_host, args.db_port, args.db_name, args.db_user, args.db_password)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'duckdb':
from duckdb_utils import connect_to_db, close_connection, query
db_connection = connect_to_db(args.db_path)
fetch_last_block_query = f"SELECT * FROM blocks_{args.relay_chain}_{args.chain} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)
close_connection(db_connection)
elif args.database == 'bigquery':
from bigquery_utils import connect_to_bigquery, query
db_connection = connect_to_bigquery(args.db_project, args.db_cred_path)
fetch_last_block_query = f"SELECT * FROM {args.db_dataset}.{args.db_table} ORDER BY number DESC LIMIT 50"
recent_blocks = query(db_connection, fetch_last_block_query)

recent_blocks['timestamp'] = recent_blocks['timestamp'].apply(lambda x: datetime.fromtimestamp(x/1000).strftime("%Y-%m-%d %H:%M:%S") )

# Streamlit app
st.title("Dotlake Explorer")
Expand Down Expand Up @@ -89,8 +103,7 @@ def parse_arguments():

st.dataframe(recent_blocks[['number', 'timestamp', 'hash', 'finalized']])

# Close the connection
conn.close()
except Exception as e:
st.error("Oops...something went wrong, please refresh the page")
st.error(e)

168 changes: 168 additions & 0 deletions ingest/bigquery_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import os
from google.cloud import bigquery
from google.oauth2 import service_account

def connect_to_bigquery(project_id, credentials_path):
"""
Connect to BigQuery.
Args:
project_id (str): The Google Cloud project ID.
credentials_path (str): Path to the service account credentials JSON file.
Returns:
google.cloud.bigquery.client.Client: A BigQuery client.
"""
if not os.path.exists(credentials_path):
raise FileNotFoundError(f"Credentials file not found at {credentials_path}")

credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

return bigquery.Client(credentials=credentials, project=project_id)

def create_blocks_table(client, dataset_id, table_id):
"""
Create the blocks table if it doesn't exist.
Args:
client (google.cloud.bigquery.client.Client): A BigQuery client.
dataset_id (str): The ID of the dataset to create the table in.
table_id (str): The ID of the table to create.
chain (str): The name of the chain.
relay_chain (str): The name of the relay chain.
"""
schema = [
bigquery.SchemaField("relay_chain", "STRING", mode="REQUIRED"),
bigquery.SchemaField("chain", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("number", "STRING", mode="REQUIRED"),
bigquery.SchemaField("hash", "STRING", mode="REQUIRED"),
bigquery.SchemaField("parentHash", "STRING", mode="REQUIRED"),
bigquery.SchemaField("stateRoot", "STRING", mode="REQUIRED"),
bigquery.SchemaField("extrinsicsRoot", "STRING", mode="REQUIRED"),
bigquery.SchemaField("authorId", "STRING", mode="REQUIRED"),
bigquery.SchemaField("finalized", "BOOLEAN", mode="REQUIRED"),
bigquery.SchemaField("extrinsics", "RECORD", mode="REPEATED", fields=[
bigquery.SchemaField("method", "RECORD", fields=[
bigquery.SchemaField("pallet", "STRING", mode="REQUIRED"),
bigquery.SchemaField("method", "STRING", mode="REQUIRED")
]),
bigquery.SchemaField("signature", "RECORD", fields=[
bigquery.SchemaField("signature", "STRING"),
bigquery.SchemaField("signer", "STRING")
]),
bigquery.SchemaField("nonce", "STRING"),
bigquery.SchemaField("args", "STRING"),
bigquery.SchemaField("tip", "STRING"),
bigquery.SchemaField("hash", "STRING"),
bigquery.SchemaField("info", "STRING"),
bigquery.SchemaField("era", "RECORD", fields=[
bigquery.SchemaField("immortalEra", "STRING"),
bigquery.SchemaField("mortalEra", "STRING", mode='REPEATED')
]),
bigquery.SchemaField("events", "RECORD", mode='REPEATED', fields=[
bigquery.SchemaField("method", "RECORD", fields=[
bigquery.SchemaField("pallet", "STRING"),
bigquery.SchemaField("method", "STRING")
]),
bigquery.SchemaField("data", "STRING")
]),
bigquery.SchemaField("success", "BOOLEAN"),
bigquery.SchemaField("paysFee", "BOOLEAN"),
]),
bigquery.SchemaField("onFinalize", "RECORD", fields=[
bigquery.SchemaField("events", "RECORD", mode='REPEATED', fields=[
bigquery.SchemaField("method", "RECORD", fields=[
bigquery.SchemaField("pallet", "STRING"),
bigquery.SchemaField("method", "STRING")
]),
bigquery.SchemaField("data", "STRING")
]),
]),
bigquery.SchemaField("onInitialize", "RECORD", fields=[
bigquery.SchemaField("events", "RECORD", mode='REPEATED', fields=[
bigquery.SchemaField("method", "RECORD", fields=[
bigquery.SchemaField("pallet", "STRING"),
bigquery.SchemaField("method", "STRING")
]),
bigquery.SchemaField("data", "STRING")
]),
]),
bigquery.SchemaField("logs", "RECORD", mode='REPEATED', fields=[
bigquery.SchemaField("type", "STRING"),
bigquery.SchemaField("index", "STRING"),
bigquery.SchemaField("value", "STRING")
])
]

table = bigquery.Table(f"{client.project}.{dataset_id}.{table_id}", schema=schema)
table = client.create_table(table, exists_ok=True)
print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

def insert_block(client, dataset_id, table_id, block_data):
"""
Insert a block into the BigQuery table.
Args:
client (google.cloud.bigquery.client.Client): A BigQuery client.
dataset_id (str): The ID of the dataset containing the table.
table_id (str): The ID of the table to insert into.
block_data (dict): The block data to insert.
"""
table_ref = client.dataset(dataset_id).table(table_id)
errors = client.insert_rows_json(table_ref, [block_data])
if errors:
print(f"Encountered errors while inserting rows: {errors}")
else:
print(f"Inserted 1 row into {dataset_id}.{table_id}")

def update_block(client, dataset_id, table_id, block_number, update_data):
"""
Update a block in the BigQuery table.
Args:
client (google.cloud.bigquery.client.Client): A BigQuery client.
dataset_id (str): The ID of the dataset containing the table.
table_id (str): The ID of the table to update.
block_number (str): The block number to update.
update_data (dict): The data to update the block with.
"""
query = f"""
UPDATE `{client.project}.{dataset_id}.{table_id}`
SET {', '.join([f"{k} = @{k}" for k in update_data.keys()])}
WHERE number = @block_number
"""

job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("block_number", "STRING", block_number),
*[bigquery.ScalarQueryParameter(k, "STRING", v) for k, v in update_data.items()]
]
)

query_job = client.query(query, job_config=job_config)
query_job.result()

print(f"Updated block {block_number} in {dataset_id}.{table_id}")


def query(client, query_str):
"""
Execute a query on BigQuery and return the results as a dataframe.
Args:
client (google.cloud.bigquery.client.Client): A BigQuery client.
query_str (str): The query string to execute.
Returns:
pandas.DataFrame: The query results as a DataFrame.
"""
query_job = client.query(query_str)
results = query_job.result()
df = results.to_dataframe()

return df

Loading

0 comments on commit 118508a

Please sign in to comment.