-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmigrate_sec_database.py
More file actions
205 lines (171 loc) Β· 7.64 KB
/
migrate_sec_database.py
File metadata and controls
205 lines (171 loc) Β· 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
"""
Migrate SEC data to its own database
1. Create 'sec' database
2. Copy philgeps.contractors table to sec.contractors
3. Drop philgeps.contractors table
4. Recreate philgeps.contractors as cleaned version of project_contractors
"""
import asyncio
import asyncpg
import os
from dotenv import load_dotenv
from sync_flood_contractors import split_joint_venture, is_valid_contractor_name
load_dotenv('.env')
async def main():
print("π Starting SEC database migration...\n")
# Connect to postgres database to create new database
postgres_conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', 5432)),
user=os.getenv('POSTGRES_USER', 'budget_admin'),
password=os.getenv('POSTGRES_PASSWORD', ''),
database='postgres'
)
# Step 1: Create SEC database
print("π Step 1: Creating 'sec' database...")
try:
await postgres_conn.execute('CREATE DATABASE sec')
print("β
Created 'sec' database\n")
except Exception as e:
if 'already exists' in str(e):
print("β
'sec' database already exists\n")
else:
print(f"β Error creating database: {e}")
await postgres_conn.close()
return
await postgres_conn.close()
# Connect to philgeps database
philgeps_conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', 5432)),
user=os.getenv('POSTGRES_USER', 'budget_admin'),
password=os.getenv('POSTGRES_PASSWORD', ''),
database='philgeps'
)
# Connect to sec database
sec_conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', 5432)),
user=os.getenv('POSTGRES_USER', 'budget_admin'),
password=os.getenv('POSTGRES_PASSWORD', ''),
database='sec'
)
# Step 2: Copy contractors table schema to sec database
print("π Step 2: Creating contractors table in 'sec' database...")
await sec_conn.execute('''
CREATE TABLE IF NOT EXISTS contractors (
id SERIAL PRIMARY KEY,
contractor_name TEXT NOT NULL,
sec_number VARCHAR(255),
date_registered DATE,
status VARCHAR(50),
address TEXT,
secondary_licenses TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
project_count INTEGER DEFAULT 0,
source TEXT DEFAULT 'unknown',
former_id INTEGER REFERENCES contractors(id)
)
''')
# Create unique index on sec_number
await sec_conn.execute('''
CREATE UNIQUE INDEX IF NOT EXISTS contractors_sec_number_unique
ON contractors (sec_number)
WHERE sec_number IS NOT NULL
''')
print("β
Created contractors table in 'sec' database\n")
# Step 3: Copy data from philgeps.contractors to sec.contractors
print("π Step 3: Copying data from philgeps.contractors to sec.contractors...")
contractors = await philgeps_conn.fetch('SELECT * FROM contractors')
print(f" Found {len(contractors)} contractors to copy")
copied = 0
for contractor in contractors:
try:
await sec_conn.execute('''
INSERT INTO contractors (
id, contractor_name, sec_number, date_registered, status,
address, secondary_licenses, created_at, updated_at,
project_count, source, former_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
''',
contractor['id'],
contractor['contractor_name'],
contractor['sec_number'],
contractor['date_registered'],
contractor['status'],
contractor['address'],
contractor.get('secondary_licenses'),
contractor['created_at'],
contractor['updated_at'],
contractor.get('project_count', 0),
contractor.get('source', 'unknown'),
contractor.get('former_id')
)
copied += 1
if copied % 1000 == 0:
print(f" Progress: {copied}/{len(contractors)}...")
except Exception as e:
print(f"β οΈ Error copying contractor {contractor['id']}: {e}")
print(f"β
Copied {copied} contractors to sec.contractors\n")
# Step 4: Drop old philgeps.contractors table
print("π Step 4: Dropping old philgeps.contractors table...")
await philgeps_conn.execute('DROP TABLE IF EXISTS contractors CASCADE')
print("β
Dropped philgeps.contractors\n")
# Step 5: Create new philgeps.contractors from cleaned project_contractors
print("π Step 5: Creating new philgeps.contractors from project_contractors...")
await philgeps_conn.execute('''
CREATE TABLE contractors (
id SERIAL PRIMARY KEY,
contractor_name TEXT NOT NULL UNIQUE,
project_count INTEGER DEFAULT 0,
source TEXT DEFAULT 'project_contractors',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
print("β
Created new philgeps.contractors table\n")
# Step 6: Extract and clean contractors from project_contractors
print("π Step 6: Extracting and cleaning contractors from project_contractors...")
raw_contractors = await philgeps_conn.fetch('''
SELECT DISTINCT contractor_name
FROM project_contractors
WHERE contractor_name IS NOT NULL
''')
print(f" Found {len(raw_contractors)} raw contractor names")
# Clean and split contractors
all_individual_contractors = set()
for row in raw_contractors:
contractor_name = row['contractor_name']
# Split using same logic as flood sync
individual_contractors = split_joint_venture(contractor_name)
for contractor_data in individual_contractors:
contractor = contractor_data['name']
if contractor and contractor.strip() and is_valid_contractor_name(contractor):
all_individual_contractors.add(contractor.strip())
print(f" After cleaning and splitting: {len(all_individual_contractors)} unique contractors\n")
# Insert into new philgeps.contractors
print("π Step 7: Inserting cleaned contractors into philgeps.contractors...")
inserted = 0
for contractor_name in sorted(all_individual_contractors):
try:
await philgeps_conn.execute('''
INSERT INTO contractors (contractor_name, source)
VALUES ($1, $2)
''', contractor_name, 'project_contractors')
inserted += 1
if inserted % 100 == 0:
print(f" Progress: {inserted}/{len(all_individual_contractors)}...")
except Exception as e:
print(f"β οΈ Error inserting '{contractor_name}': {e}")
print(f"β
Inserted {inserted} cleaned contractors into philgeps.contractors\n")
await philgeps_conn.close()
await sec_conn.close()
print("β
Migration completed!")
print("\nSummary:")
print(f" β’ Created 'sec' database with {copied} contractors (SEC data)")
print(f" β’ Created new philgeps.contractors with {inserted} cleaned contractors")
print(f" β’ Source: project_contractors (cleaned and split)")
if __name__ == '__main__':
asyncio.run(main())