-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsync_philgeps_parallel.py
More file actions
194 lines (154 loc) Β· 6.95 KB
/
sync_philgeps_parallel.py
File metadata and controls
194 lines (154 loc) Β· 6.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python3
"""
Match PhilGEPS contractors with SEC database using parallel processing
Uses 5 threads for faster execution
"""
import asyncio
import asyncpg
import os
from dotenv import load_dotenv
from difflib import SequenceMatcher
import re
from concurrent.futures import ThreadPoolExecutor
import threading
load_dotenv('.env')
def normalize_contractor_name(name):
"""Normalize contractor name for fuzzy matching"""
if not name:
return ""
normalized = name.upper().strip()
normalized = normalized.replace('.', ' ')
normalized = normalized.replace(',', ' ')
normalized = normalized.replace('-', ' ')
normalized = normalized.replace('&', 'AND')
normalized = re.sub(r'\s+', ' ', normalized).strip()
suffixes_to_remove = [
'CORPORATION', 'CORP', 'INC', 'INCORPORATED', 'CO', 'COMPANY',
'LTD', 'LIMITED', 'ENTERPRISES', 'ENTERPRISE'
]
words = normalized.split()
filtered_words = [w for w in words if w not in suffixes_to_remove]
return ' '.join(filtered_words) if filtered_words else normalized
def fuzzy_match(name1, name2, threshold=0.88):
"""Strict fuzzy matching"""
if not name1 or not name2:
return False
if name1 == name2:
return True
norm1 = normalize_contractor_name(name1)
norm2 = normalize_contractor_name(name2)
if norm1 == norm2:
return True
ratio = SequenceMatcher(None, norm1, norm2).ratio()
return ratio >= threshold
def check_philgeps_match(sec_contractor, philgeps_names, philgeps_normalized):
"""Check if SEC contractor matches any PhilGEPS contractor (thread-safe)"""
contractor_id = sec_contractor['id']
contractor_name = sec_contractor['contractor_name']
# First try exact normalized match
norm_sec = normalize_contractor_name(contractor_name)
if norm_sec in philgeps_normalized:
return contractor_id, True
# Then try fuzzy match
for philgeps_name in philgeps_names:
if fuzzy_match(contractor_name, philgeps_name):
return contractor_id, True
return contractor_id, False
async def main():
print("π Parallel PhilGEPS matching (5 threads)...\n")
# Get PhilGEPS contractors
philgeps_conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', 5432)),
user=os.getenv('POSTGRES_USER', 'budget_admin'),
password=os.getenv('POSTGRES_PASSWORD', ''),
database='philgeps'
)
print("π Fetching PhilGEPS awardee names...")
philgeps_contractors = await philgeps_conn.fetch('''
SELECT DISTINCT awardee_name
FROM contracts
WHERE awardee_name IS NOT NULL
''')
philgeps_names = [row['awardee_name'] for row in philgeps_contractors]
philgeps_normalized = {normalize_contractor_name(name): name for name in philgeps_names}
print(f"β
Found {len(philgeps_names):,} PhilGEPS contractors\n")
await philgeps_conn.close()
# Get SEC contractors
sec_conn = await asyncpg.connect(
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=int(os.getenv('POSTGRES_PORT', 5432)),
user=os.getenv('POSTGRES_USER', 'budget_admin'),
password=os.getenv('POSTGRES_PASSWORD', ''),
database='sec'
)
print("π Fetching SEC contractors...")
sec_contractors = await sec_conn.fetch('SELECT id, contractor_name FROM contractors')
print(f"β
Found {len(sec_contractors):,} SEC contractors\n")
print("π Starting parallel fuzzy matching (5 threads)...")
print(" This will take a few minutes...\n")
# Use ThreadPoolExecutor for parallel processing
matches_to_update = []
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit all tasks
futures = []
for sec_contractor in sec_contractors:
future = executor.submit(check_philgeps_match, sec_contractor, philgeps_names, philgeps_normalized)
futures.append(future)
# Process results as they complete
completed = 0
matched = 0
for future in futures:
contractor_id, has_match = future.result()
if has_match:
matches_to_update.append(contractor_id)
matched += 1
completed += 1
if completed % 500 == 0:
print(f" Progress: {completed:,}/{len(sec_contractors):,} ({matched:,} matched, {(matched/completed*100):.1f}%)...")
print(f"\nβ
Matching complete: {matched:,} contractors matched with PhilGEPS")
print(f"π Updating database...\n")
# Batch update
batch_size = 1000
for i in range(0, len(matches_to_update), batch_size):
batch = matches_to_update[i:i+batch_size]
await sec_conn.execute('''
UPDATE contractors
SET has_philgeps = true
WHERE id = ANY($1::int[])
''', batch)
if (i + len(batch)) % 1000 == 0:
print(f" Updated: {i + len(batch):,}/{len(matches_to_update):,}...")
print(f"β
Database updated!\n")
# Get final statistics
stats = await sec_conn.fetchrow('''
SELECT
COUNT(*) FILTER (WHERE has_flood AND NOT has_dime AND NOT has_philgeps) as flood_only,
COUNT(*) FILTER (WHERE has_dime AND NOT has_flood AND NOT has_philgeps) as dime_only,
COUNT(*) FILTER (WHERE has_philgeps AND NOT has_flood AND NOT has_dime) as philgeps_only,
COUNT(*) FILTER (WHERE has_flood AND has_dime AND NOT has_philgeps) as flood_dime,
COUNT(*) FILTER (WHERE has_flood AND has_philgeps AND NOT has_dime) as flood_philgeps,
COUNT(*) FILTER (WHERE has_dime AND has_philgeps AND NOT has_flood) as dime_philgeps,
COUNT(*) FILTER (WHERE has_flood AND has_dime AND has_philgeps) as all_three,
COUNT(*) FILTER (WHERE has_flood) as total_flood,
COUNT(*) FILTER (WHERE has_dime) as total_dime,
COUNT(*) FILTER (WHERE has_philgeps) as total_philgeps
FROM contractors
''')
print("π Final Venn Diagram Data:")
print(f" π΅ Flood only: {stats['flood_only']:,}")
print(f" π’ DIME only: {stats['dime_only']:,}")
print(f" π‘ PhilGEPS only: {stats['philgeps_only']:,}")
print(f" π΅π’ Flood + DIME: {stats['flood_dime']:,}")
print(f" π΅π‘ Flood + PhilGEPS: {stats['flood_philgeps']:,}")
print(f" π’π‘ DIME + PhilGEPS: {stats['dime_philgeps']:,}")
print(f" π΅π’π‘ All three: {stats['all_three']:,}")
print()
print(f" Total per source:")
print(f" π΅ Flood: {stats['total_flood']:,}")
print(f" π’ DIME: {stats['total_dime']:,}")
print(f" π‘ PhilGEPS: {stats['total_philgeps']:,}")
await sec_conn.close()
print("\nβ
Sync completed!")
if __name__ == '__main__':
asyncio.run(main())