Skip to content

Commit

Permalink
refactor(bq-syncer): improve performance and readability
Browse files Browse the repository at this point in the history
  • Loading branch information
daoleno committed Jan 15, 2024
1 parent 40ed836 commit 19399ff
Showing 1 changed file with 89 additions and 92 deletions.
181 changes: 89 additions & 92 deletions bq-syncer/sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import logging
import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

import duckdb
Expand All @@ -10,6 +12,8 @@
from google.cloud import bigquery
from google.oauth2 import service_account

logging.basicConfig(level=logging.INFO)

# Initialize BigQuery client
service_account_path = "service_account.json"
credentials = service_account.Credentials.from_service_account_file(
Expand All @@ -26,6 +30,13 @@
parser.add_argument(
"-o", "--output", help="Path to the directory to which exports will be saved."
)
parser.add_argument(
"-c",
"--concurrency",
help="Number of concurrent tasks.",
type=int,
default=1,
)
args = parser.parse_args()

dataset_ref = bqclient.dataset("v2_polygon", project="lens-public-data")
Expand Down Expand Up @@ -55,7 +66,7 @@ def create_output_directory():
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
output_directory = os.path.join(args.output, f"v2_polygon_{date_str}")
os.makedirs(output_directory, exist_ok=True)
print(f"Output directory set to: {output_directory}")
logging.info(f"Output directory set to: {output_directory}")
return output_directory


Expand All @@ -68,7 +79,7 @@ def update_symbolic_link(output_dir):
if os.path.islink(symlink_path):
os.unlink(symlink_path)
os.symlink(os.path.abspath(output_dir), symlink_path)
print(f"Updated symbolic link {symlink_path} to: {output_dir}")
logging.info(f"Symbolic link updated to: {output_dir}")


def export_tables(conn, output_dir):
Expand All @@ -82,11 +93,11 @@ def export_tables(conn, output_dir):
).fetchall()

if not tables:
print("No tables found in the database.")
logging.info("No tables found in the database.")
return

total_tables = len(tables)
print(f"Found {total_tables} tables.")
logging.info(f"Found {total_tables} tables.")

# Export each table to a Parquet file
for index, table in enumerate(tables, start=1):
Expand All @@ -95,12 +106,12 @@ def export_tables(conn, output_dir):
conn.execute(
f"COPY {table_name} TO '{parquet_file_path}' (FORMAT 'parquet')"
)
print(
f"[{datetime.now()}] Exported table {index}/{total_tables}: {table_name} to {parquet_file_path}"
logging.info(
f"Exported table {index}/{total_tables}: {table_name} to {parquet_file_path}"
)

except Exception as e:
print(f"An error occurred: {e}")
logging.error(f"An error occurred: {e}")


def delete_old_dir():
Expand All @@ -118,113 +129,99 @@ def delete_old_dir():
and os.path.abspath(dir_path) != cur_link_target
):
shutil.rmtree(dir_path)
print(f"Deleted old directory: {dir_path}")
logging.info(f"Deleted old directory: {dir_path}")


def process_table(table, conn, dataset_ref, bqclient, idx):
table_id = table.table_id
table_ref = dataset_ref.table(table_id)

cursor = conn.cursor()

# Check if table exists in DuckDB
cursor.execute(
f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'"
)
if cursor.fetchone()[0] == 0:
# Create table in DuckDB
logging.info(f"Creating table: {table_id}")
table_schema = bqclient.get_table(table_ref).schema
converted_schema = convert_schema(table_schema)
ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})"
cursor.execute(ddl)

# Retrieve and sync data
last_timestamp_result = cursor.execute(
f"SELECT MAX(source_timestamp) FROM {table_id}"
).fetchone()
last_timestamp = (
last_timestamp_result[0] if last_timestamp_result[0] is not None else 0
)

# Build and execute query
fields = [
f.name
if f.name != "datastream_metadata"
else "datastream_metadata.source_timestamp"
for f in table_schema
]
query = f"""
SELECT {', '.join(fields)}
FROM `{table_ref}`
WHERE datastream_metadata.source_timestamp > {last_timestamp}
"""
query_job = bqclient.query(query)
pageNum = 0
for page in query_job.result(page_size=10000).pages:
pageNum += 1
items = list(page)
if items:
df = pl.DataFrame(
{
field.name: list(data)
for field, data in zip(table_schema, zip(*items))
}
)
cursor.register("df", df)
cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df")
logging.info(
f"Synced table {idx}: {table_id} - page {pageNum} - {len(items)} rows"
)
logging.info(f"Synced table {idx}: {table_id}")


def perform_sync_task():
global is_task_running

# Check if task is already running
if is_task_running:
print(
f"[{datetime.now()}] Another sync task is already running. Skipping this cycle."
)
logging.info("Another sync task is already running. Skipping this cycle.")
return

is_task_running = True
print(f"[{datetime.now()}] Starting data sync...")
logging.info("Starting data sync...")

# Create a new DuckDB connection for each task
with duckdb.connect(database=args.input) as conn:
cursor = conn.cursor()

try:
tables = list(bqclient.list_tables(dataset)) # Convert to list only once
total_tables = len(tables)
for index, table in enumerate(tables, start=1):
table_id = table.table_id
table_ref = dataset_ref.table(table_id)

# Check if table exists in DuckDB and create if not
cursor.execute(
f"SELECT count(*) FROM information_schema.tables WHERE table_name = '{table_id}'"
)
if cursor.fetchone()[0] == 0:
# Fetch the table schema from BigQuery
table_schema = bqclient.get_table(table_ref).schema
# Convert schema, replacing RECORD type with individual fields
converted_schema = convert_schema(table_schema)
ddl = f"CREATE TABLE {table_id} ({', '.join(converted_schema)})"
cursor.execute(ddl)

# Attempt to retrieve the last sync timestamp from the DuckDB table
last_timestamp_result = cursor.execute(
f"SELECT MAX(source_timestamp) FROM {table_id}"
).fetchone()
last_timestamp = (
last_timestamp_result[0]
if last_timestamp_result[0] is not None
else 0
)

# Build query to fetch new or updated records from BigQuery
# Fetch the table schema from BigQuery
table_schema = bqclient.get_table(table_ref).schema

# Generate list of fields, preserving the original schema's order.
fields = [
f.name
if f.name != "datastream_metadata"
else "datastream_metadata.source_timestamp"
for f in table_schema
]

query = f"""
SELECT {', '.join(fields)}
FROM `{table_ref}`
WHERE datastream_metadata.source_timestamp > {last_timestamp}
"""
query_job = bqclient.query(query)
iterator = query_job.result(
page_size=10000
) # Fetch 1000 rows at a time
page_num = 0
for page in iterator.pages:
page_num += 1
items = list(page)
df = pl.DataFrame(
{
field.name: list(data)
for field, data in zip(table_schema, zip(*items))
}
)
# insert into duckdb
if df.height > 0: # Check if DataFrame is not empty
cursor.register("df", df)
cursor.execute(f"INSERT INTO {table_id} SELECT * FROM df")
print(
f"[{datetime.now()}] Synced table {index}/{total_tables}: {table_id}, Page: {page_num}, Rows: {len(items)}"
)
else:
print(
f"[{datetime.now()}] No new records found for table {index}/{total_tables}: {table_id}"
)
tables = list(bqclient.list_tables(dataset))
logging.info(f"Found {len(tables)} tables in BigQuery.")

is_task_running = False
with ThreadPoolExecutor(max_workers=args.concurrency) as executor:
for idx, table in enumerate(tables, start=1):
executor.submit(
process_table, table, conn, dataset_ref, bqclient, idx
)

if args.output:
# export tables to parquet files
output_directory = create_output_directory()
export_tables(conn, output_directory)
update_symbolic_link(output_directory)

# Add this new deletion operation after updating the symbolic link.
delete_old_dir()

print(f"[{datetime.now()}] Data sync completed.")
logging.info(f"Synced {len(tables)} tables.")

except Exception as e:
print(f"An error occurred: {e}")
logging.error(f"An error occurred: {e}")
finally:
is_task_running = False


Expand Down

0 comments on commit 19399ff

Please sign in to comment.