-
Notifications
You must be signed in to change notification settings - Fork 0
/
postgres_utils.py
180 lines (157 loc) · 5.68 KB
/
postgres_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import psycopg2
from psycopg2 import Error
import pandas as pd
import json
def connect_to_postgres(host, port, database, user, password):
"""
Establish a connection to the PostgreSQL database.
Args:
host (str): The database host.
port (int): The database port.
database (str): The name of the database.
user (str): The database user.
password (str): The database password.
Returns:
psycopg2.extensions.connection: A connection object if successful, None otherwise.
"""
try:
connection = psycopg2.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
print("Successfully connected to PostgreSQL database")
return connection
except Error as e:
print(f"Error connecting to PostgreSQL database: {e}")
return None
def create_tables(connection, chain, relay_chain):
"""
Create necessary tables in the PostgreSQL database if they don't exist.
Args:
connection (psycopg2.extensions.connection): The database connection object.
chain (str): The name of the chain.
relay_chain (str): The name of the relay chain.
"""
try:
cursor = connection.cursor()
delete_table(connection, f"blocks_{relay_chain}_{chain}")
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS blocks_{relay_chain}_{chain} (
relay_chain VARCHAR(255),
chain VARCHAR(255),
timestamp BIGINT,
number VARCHAR(255) PRIMARY KEY,
hash VARCHAR(255),
parenthash VARCHAR(255),
stateroot VARCHAR(255),
extrinsicsroot VARCHAR(255),
authorid VARCHAR(255),
finalized BOOLEAN,
oninitialize JSONB,
onfinalize JSONB,
logs JSONB,
extrinsics JSONB
)
""")
connection.commit()
print("Tables created successfully")
except Error as e:
print(f"Error creating tables: {e}")
def insert_block_data(connection, block_data, chain, relay_chain):
"""
Insert processed block data into the PostgreSQL database.
Args:
connection (psycopg2.extensions.connection): The database connection object.
block_data (dict): The block data to be inserted.
chain (str): The name of the chain.
relay_chain (str): The name of the relay chain.
"""
try:
cursor = connection.cursor()
insert_query = f"""
INSERT INTO blocks_{relay_chain}_{chain}
(relay_chain, chain, timestamp, number, hash, parenthash, stateroot, extrinsicsroot, authorid, finalized, oninitialize, onfinalize, logs, extrinsics)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (number) DO UPDATE SET
relay_chain = EXCLUDED.relay_chain,
chain = EXCLUDED.chain,
timestamp = EXCLUDED.timestamp,
hash = EXCLUDED.hash,
parenthash = EXCLUDED.parenthash,
stateroot = EXCLUDED.stateroot,
extrinsicsroot = EXCLUDED.extrinsicsroot,
authorid = EXCLUDED.authorid,
finalized = EXCLUDED.finalized,
oninitialize = EXCLUDED.oninitialize,
onfinalize = EXCLUDED.onfinalize,
logs = EXCLUDED.logs,
extrinsics = EXCLUDED.extrinsics
"""
values = (
block_data['relay_chain'],
block_data['chain'],
block_data['timestamp'],
block_data['number'],
block_data['hash'],
block_data['parentHash'],
block_data['stateRoot'],
block_data['extrinsicsRoot'],
block_data['authorId'],
block_data['finalized'],
json.dumps(block_data['onInitialize']),
json.dumps(block_data['onFinalize']),
json.dumps(block_data['logs']),
json.dumps(block_data['extrinsics'])
)
cursor.execute(insert_query, values)
connection.commit()
print(f"Block {block_data['number']} inserted/updated successfully")
except Error as e:
print(f"Error inserting block data: {e}")
def close_connection(connection):
"""
Safely close the PostgreSQL database connection.
Args:
connection (psycopg2.extensions.connection): The database connection object.
"""
if connection:
connection.close()
print("PostgreSQL connection closed")
def delete_table(connection, table_name):
"""
Delete a table from the PostgreSQL database.
Args:
connection (psycopg2.extensions.connection): The database connection object.
table_name (str): The name of the table to be deleted.
"""
try:
cursor = connection.cursor()
# SQL query to delete the table
delete_query = f"DROP TABLE IF EXISTS {table_name}"
# Execute the query
cursor.execute(delete_query)
# Commit the changes
connection.commit()
print(f"Table '{table_name}' has been successfully deleted.")
except Error as e:
print(f"Error deleting table: {e}")
finally:
if cursor:
cursor.close()
def query(connection, query_str):
try:
cursor = connection.cursor()
cursor.execute(query_str)
columns = [desc[0] for desc in cursor.description]
results = cursor.fetchall()
df = pd.DataFrame(results, columns=columns)
return df
except Error as e:
print(f"Error executing query: {e}")
return None
finally:
if cursor:
cursor.close()