Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 2025-05-16
- Rework `/ingestion` to use the DuckDB BigQuery extension instead of BigQuery Python API.
- Bump DuckDB to `1.2.2`
- Bump dbt-duckdb to `1.9.3`

## 2025-02-09
- Bumping DuckDB to `1.2.0`.

Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
ARG PLATFORM=amd64

# Stage 1: Base
FROM python:3.12 as base
FROM --platform=linux/${PLATFORM} python:3.12 as base

# Install UV via pip
RUN pip install uv==0.5.5 --no-cache-dir
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ The project is a monorepo composed of series in 3 parts :
- transformation, under `transform` folder ([YouTube video](https://www.youtube.com/watch?v=SpfEQQXBGMQ), [Blog](https://motherduck.com/blog/duckdb-dbt-e2e-data-engineering-project-part-2/))
- Visualization, under `dashboard` folder ([YouTube video](https://youtu.be/ta_Pzc2EEEo), [Blog](https://motherduck.com/blog/duckdb-dashboard-e2e-data-engineering-project-part-3/))

Please refer to the [`CHANGELOG.md`](./CHANGELOG.md) for the latest updates.
> ⚠️ This project has undergone significant changes since the tutorials were recorded.
> To follow along with the original tutorial content, please check out the [`feat/tutorial-archive` branch](https://github.com/mehd-io/pypi-duck-flow/tree/feat/tutorial-archive).

You can also refer to the [`CHANGELOG.md`](./CHANGELOG.md) for a complete list of updates.

## High level architecture
![High level architecture](./docs/etl_architecture.png)
Expand Down
72 changes: 14 additions & 58 deletions ingestion/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,29 @@
import os
from google.cloud import bigquery
from google.oauth2 import service_account
from google.auth.exceptions import DefaultCredentialsError
from loguru import logger
import time
from ingestion.models import PypiJobParameters, FileDownloads
import pyarrow as pa
from ingestion.models import PypiJobParameters

PYPI_PUBLIC_DATASET = "bigquery-public-data.pypi.file_downloads"


def build_pypi_query(
params: PypiJobParameters, pypi_public_dataset: str = PYPI_PUBLIC_DATASET
) -> str:
# Query the public PyPI dataset from BigQuery
"""Build an optimized BigQuery query for PyPI file downloads
# /!\ This is a large dataset, filter accordingly /!\
"""

return f"""
SELECT *
SELECT
timestamp,
country_code,
url,
project,
file,
details,
tls_protocol,
tls_cipher
FROM
`{pypi_public_dataset}`
WHERE
project = '{params.pypi_project}'
project = ''{params.pypi_project}''
AND {params.timestamp_column} >= TIMESTAMP("{params.start_date}")
AND {params.timestamp_column} < TIMESTAMP("{params.end_date}")
"""


def get_bigquery_client(project_name: str) -> bigquery.Client:
"""Get Big Query client"""
try:
service_account_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

if service_account_path:
credentials = service_account.Credentials.from_service_account_file(
service_account_path
)
bigquery_client = bigquery.Client(
project=project_name, credentials=credentials
)
return bigquery_client

raise EnvironmentError(
"No valid credentials found for BigQuery authentication."
)

except DefaultCredentialsError as creds_error:
raise creds_error


2


def get_bigquery_result(
query_str: str, bigquery_client: bigquery.Client, model: FileDownloads
) -> pa.Table:
"""Get query result from BigQuery and yield rows as dictionaries."""
try:
# Start measuring time
start_time = time.time()
# Run the query and directly load into a DataFrame
logger.info(f"Running query: {query_str}")
# dataframe = bigquery_client.query(query_str).to_dataframe(dtypes=FileDownloads().pandas_dtypes)
pa_tbl = bigquery_client.query(query_str).to_arrow()
# Log the time taken for query execution and data loading
elapsed_time = time.time() - start_time
logger.info(f"Query executed and data loaded in {elapsed_time:.2f} seconds")
# Iterate over DataFrame rows and yield as dictionaries
return pa_tbl

except Exception as e:
logger.error(f"Error running query: {e}")
raise
Loading