Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
155 commits
Select commit Hold shift + click to select a range
e8411f1
test
May 1, 2025
df1c5f1
testing local connection to Opensearch
May 1, 2025
a4025ca
Fix OpenSearch issues
aguest-kc May 29, 2025
2c6da1b
Disable the OpenSearch Dashboards security
aguest-kc Aug 7, 2025
445d412
[DEV-13726] Add DuckDB dependency
aguest-kc Nov 5, 2025
9a0f694
[DEV-13726] Add DuckDB download strategy
aguest-kc Nov 5, 2025
1c48b93
[DEV-13726] Update number formatting and import order
aguest-kc Nov 5, 2025
5cf9413
[DEV-13726] Add DuckDB type hints and imports
aguest-kc Nov 5, 2025
55a2ea9
[DEV-13726] Make Spark downloads work with DuckDB
aguest-kc Nov 5, 2025
b4f9340
[DEV-13726] Clean up write CSV function
aguest-kc Nov 5, 2025
51c4762
[DEV-13726] Pin DuckDB version and install extensions
aguest-kc Nov 5, 2025
1a9a320
[DEV-13726] Time the entire download process
aguest-kc Nov 5, 2025
28c5185
[DEV-13726] Remove DuckDB flag from viewset
aguest-kc Nov 5, 2025
8091b75
[DEV-13726] Manually set memory limit for DuckDB
aguest-kc Nov 6, 2025
ee83d50
[DEV-13726] Update AWS config
aguest-kc Nov 6, 2025
f8b857d
[DEV-13726] Use env vars for AWS secret
aguest-kc Nov 6, 2025
d86e1a1
Merge pull request #4538 from fedspendingtransparency/mod/dev-13706-t…
sethstoudenmier Nov 7, 2025
ee645b9
[DEV-13726] Update the Broker DB URL env variable name
aguest-kc Nov 12, 2025
7b05e11
[DEV-13726] Update Broker env var name
aguest-kc Nov 12, 2025
4ebd337
Merge pull request #4541 from fedspendingtransparency/staging
loreleitrimberger Nov 12, 2025
cb264a3
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 13, 2025
56b7bea
[DEV-13912] fix cache for downloads
loreleitrimberger Nov 13, 2025
b4e4ec3
[DEV-13726] Add comment
aguest-kc Nov 13, 2025
89b7983
[DEV-13912] add test
loreleitrimberger Nov 13, 2025
0c5a730
[DEV-13912] fix test
loreleitrimberger Nov 13, 2025
7018c84
Merge pull request #4544 from fedspendingtransparency/fix/dev-13988-f…
sethstoudenmier Nov 13, 2025
58ea169
[DEV-13912] fix test
loreleitrimberger Nov 13, 2025
d9bf9b6
[DEV-13912] format
loreleitrimberger Nov 14, 2025
4077a84
[DEV-13912] update test
loreleitrimberger Nov 14, 2025
3cbf841
[DEV-13912] update test
loreleitrimberger Nov 14, 2025
56da84c
[DEV-13912] update test
loreleitrimberger Nov 14, 2025
37c4a5b
[DEV-13726] Update S3 auth and CSV generation
aguest-kc Nov 17, 2025
cf75a9c
[DEV-13726] Fix typo in DuckDB secret
aguest-kc Nov 17, 2025
ea2cb08
[DEV-13726] Remove test CSV file row limit
aguest-kc Nov 17, 2025
6a40043
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 18, 2025
8b01fc4
[DEV-13726] Consolidate SQL statements
aguest-kc Nov 18, 2025
4b72ec8
[DEV-13929] add hash key for incremental updates
loreleitrimberger Nov 18, 2025
39e614f
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
3b5bc75
[DEV-13726] Update federal account dataframe
aguest-kc Nov 19, 2025
cd7639a
[DEV-13939] fix df, add test
loreleitrimberger Nov 20, 2025
785c672
[DEV-13939] add submission_period to select
loreleitrimberger Nov 20, 2025
b3e6b96
[DEV-13939] fix agemcy_name column ambiguity
loreleitrimberger Nov 20, 2025
94ca534
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
787059a
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
de11d86
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
c8ebf2d
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
35d92ac
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
1d393a4
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
ca977dd
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
5fdd82d
[DEV-13939] test updates
loreleitrimberger Nov 20, 2025
edbed8c
[DEV-13939] format
loreleitrimberger Nov 21, 2025
24881dc
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Nov 21, 2025
ba1ca86
[DEV-13726] Flake8 fixes and comment clean up
aguest-kc Nov 21, 2025
29c25f6
[DEV-13726] black formatting fixes
aguest-kc Nov 21, 2025
f1a54f0
[DEV-13939] update award_financial_download
loreleitrimberger Nov 24, 2025
5184932
[DEV-13939] update award_financial_download
loreleitrimberger Nov 24, 2025
d2057f1
[DEV-13939] update award_financial_download
loreleitrimberger Nov 24, 2025
f0aeff1
[DEV-13939] rearrange tests
loreleitrimberger Nov 25, 2025
e656000
Merge branch 'qat' into ftr/dev-13939-file-a-b-hash
loreleitrimberger Nov 25, 2025
e94dd70
[DEV-13939] fix sam_recipient test
loreleitrimberger Nov 25, 2025
3db951a
[DEV-13939] update incremental
loreleitrimberger Nov 26, 2025
1444a70
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 1, 2025
6c2d449
[DEV-13939] add merge_hash_key check
loreleitrimberger Dec 1, 2025
325b000
[DEV-13726] Update SQL functions
aguest-kc Dec 1, 2025
c18412b
[DEV-13939] update award_financial_download to remove url_encode
loreleitrimberger Dec 1, 2025
74c1e09
[DEV-13939] update award_financial_download to remove url_encode
loreleitrimberger Dec 1, 2025
3ad2e48
[DEV-13939] remove pyarrow
loreleitrimberger Dec 1, 2025
dce5833
[DEV-13726] Add spark parameter
aguest-kc Dec 1, 2025
52960bd
[DEV-13726] Black format fix
aguest-kc Dec 1, 2025
ea2bacd
[DEV-13939] add col to award_financial_downloads
loreleitrimberger Dec 1, 2025
0f1a396
Merge pull request #4551 from fedspendingtransparency/staging
sethstoudenmier Dec 2, 2025
561ca6d
[DEV-13912] fix json_request sorting
loreleitrimberger Dec 3, 2025
2fa4767
[DEV-13912] fix json_request sorting
loreleitrimberger Dec 3, 2025
777bce2
[DEV-13726] Revert default Spark strategy to Databricks
aguest-kc Dec 4, 2025
faa7e16
[DEV-13726] Use LocalStrategy instead of DatabricksStrategy
aguest-kc Dec 4, 2025
8b15fa4
[DEV-13726] Only use Spark for File A downloads
aguest-kc Dec 4, 2025
700762a
[DEV-13939] update tests
loreleitrimberger Dec 5, 2025
a0e1d12
Merge branch 'qat' into ftr/dev-13939-file-a-b-hash
loreleitrimberger Dec 5, 2025
ee829d2
Merge pull request #4547 from fedspendingtransparency/ftr/dev-13939-f…
loreleitrimberger Dec 5, 2025
2f31e1e
Merge branch 'qat' into ftr/dev-13912-fix-download-cache
loreleitrimberger Dec 5, 2025
0a44a7f
Merge pull request #4543 from fedspendingtransparency/ftr/dev-13912-f…
loreleitrimberger Dec 5, 2025
990e37b
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 8, 2025
98339e2
[DEV-13938] Add partitionBy columns
aguest-kc Dec 9, 2025
5e8628c
test upgrade to postgres16
boozallendanny Dec 9, 2025
89e6b21
test
May 1, 2025
e061f2b
testing local connection to Opensearch
May 1, 2025
12caae5
Fix OpenSearch issues
aguest-kc May 29, 2025
5492c40
Disable the OpenSearch Dashboards security
aguest-kc Aug 7, 2025
87bad6a
Merge branch 'mod/esUpgrade' of https://github.com/fedspendingtranspa…
aguest-kc Dec 9, 2025
d369952
install from postgresql repo
boozallendanny Dec 9, 2025
31dad16
Set OpenSearch memory to 2GB
aguest-kc Dec 9, 2025
a1a45ec
Merge pull request #4384 from fedspendingtransparency/mod/esUpgrade
aguest-kc Dec 9, 2025
d25e6a9
[DEV-14150] Add partitioning and enable CDF on transaction_download
sethstoudenmier Dec 9, 2025
c896de6
Merge branch 'qat' into ftr/dev-13938-custom-account-paritioning
aguest-kc Dec 10, 2025
8a2ee5e
[DEV-14150] Formatting
sethstoudenmier Dec 10, 2025
3064ef5
Merge branch 'qat' of https://github.com/fedspendingtransparency/usas…
sethstoudenmier Dec 10, 2025
d8fadcd
[DEV-13938] Rename table spec property
aguest-kc Dec 10, 2025
0ce41ba
[DEV-13938] Update table specs to match
aguest-kc Dec 10, 2025
6522b80
[DEV-13726] Move DuckDB setup to setup_spark_session() method
aguest-kc Dec 10, 2025
ac7e90f
[DEV-13726] Convert to UNIX line endings
aguest-kc Dec 10, 2025
df1927e
[DEV-14150] Add placeholder to table_spec
sethstoudenmier Dec 11, 2025
8a9a4f8
[DEV-14150] Handle the updated table_spec
sethstoudenmier Dec 11, 2025
8e402f3
[DEV-13726] Revert Spark table logic
aguest-kc Dec 11, 2025
d12f8a8
[DEV-13726] Replaced hardcoded values with variables
aguest-kc Dec 11, 2025
6fb3e58
[DEV-13726] Remove case that would never be reached
aguest-kc Dec 11, 2025
c0adbd1
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 11, 2025
cd118f9
Merge pull request #4554 from fedspendingtransparency/mod/dev-14150-t…
sethstoudenmier Dec 12, 2025
41fab45
[DEV-13726] Explain why DuckDB version is pinned
aguest-kc Dec 12, 2025
0756469
[DEV-13726] Create a DuckDB Dockerfile
aguest-kc Dec 12, 2025
b8aea3d
[DEV-13726] Use CONFIG more
aguest-kc Dec 12, 2025
dd5c153
[DEV-13726] Log as exceptions and raise exception
aguest-kc Dec 12, 2025
a3086e1
[DEV-14093] Initial work towards triggering EMR job
sethstoudenmier Dec 12, 2025
a3d2a97
[DEV-13726] Add file_number in Pandas
aguest-kc Dec 12, 2025
2645a8a
Merge branch 'qat' into ftr/dev-13726-duckdb-file-b-downloads
aguest-kc Dec 12, 2025
454e6ba
[DEV-13726] Add `rel` definition back and flake8 fix
aguest-kc Dec 15, 2025
5ff2806
[DEV-14093] cleanup
sethstoudenmier Dec 15, 2025
012b5a1
Merge branch 'qat' into ftr/dev-13938-custom-account-paritioning
aguest-kc Dec 15, 2025
e63e52d
Merge pull request #4548 from fedspendingtransparency/ftr/dev-13726-d…
aguest-kc Dec 15, 2025
dd2a0ab
[DEV-13939] add enableChangeDataFeed for tables
loreleitrimberger Dec 15, 2025
2b6c35c
[DEV-14093] handle merge conflict
sethstoudenmier Dec 15, 2025
0294722
[DEV-13938] Flake8 fix
aguest-kc Dec 15, 2025
b2a612c
Merge branch 'qat' into ftr/dev-13938-custom-account-paritioning
aguest-kc Dec 15, 2025
58eac42
Merge branch 'qat' into ftr/dev-13939-file-a-b-hash
loreleitrimberger Dec 15, 2025
9629626
Merge pull request #4559 from fedspendingtransparency/ftr/dev-13939-f…
loreleitrimberger Dec 15, 2025
7ba8372
[DEV-14106] - Add award categories to file c download
zachflanders-frb Dec 15, 2025
6f89392
Merge branch 'qat' into ftr/dev-13938-custom-account-paritioning
aguest-kc Dec 15, 2025
62dc6b8
[DEV-13938] Flake8 fixes
aguest-kc Dec 15, 2025
831c1e8
[DEV-14106] - Update groupby to include is_fpds
zachflanders-frb Dec 15, 2025
1654891
[DEV-14106] - fixing tests
zachflanders-frb Dec 15, 2025
bd8216c
[DEV-13938] Remove partitionBy in INSERT function
aguest-kc Dec 16, 2025
528ec55
Use DuckDB v1.4.3
aguest-kc Dec 16, 2025
553ceb6
Use DuckDB v1.4.3
aguest-kc Dec 16, 2025
e113bbf
[DEV-13938] Remove partitionBy() in load
aguest-kc Dec 16, 2025
7f106a8
[DEV-14106] - Add tests, update mixin
zachflanders-frb Dec 16, 2025
7febe71
Merge branch 'qat' into ftr/dev-14106-file-c-download-award-categories
zachflanders-frb Dec 16, 2025
c19512d
Merge pull request #4556 from fedspendingtransparency/ftr/dev-13938-c…
aguest-kc Dec 16, 2025
59a3d4f
Merge branch 'qat' into ftr/dev-14106-file-c-download-award-categories
zachflanders-frb Dec 16, 2025
b868eeb
typo: replace paramater with parameter
thsmale Dec 9, 2025
e7276e3
Merge branch 'qat' into test/duckdb-1.4.3
aguest-kc Dec 16, 2025
a11b50d
Merge pull request #4561 from fedspendingtransparency/test/duckdb-1.4.3
aguest-kc Dec 16, 2025
897b6b2
Merge branch 'qat' into ftr/dev-14106-file-c-download-award-categories
zachflanders-frb Dec 16, 2025
35a6be3
Merge pull request #4560 from fedspendingtransparency/ftr/dev-14106-f…
zachflanders-frb Dec 16, 2025
e9959f5
Merge branch 'qat' into fix/dev-14183-parameter-typo
zachflanders-frb Dec 16, 2025
76bcb26
Merge pull request #4562 from fedspendingtransparency/fix/dev-14183-p…
zachflanders-frb Dec 17, 2025
aa9cae0
Merge branch 'qat' into mod/postgresql16
zachflanders-frb Dec 17, 2025
37c832d
[DEV-13912] update download response to be sorted
loreleitrimberger Dec 17, 2025
581da18
Merge pull request #4553 from fedspendingtransparency/mod/postgresql16
zachflanders-frb Dec 17, 2025
a6efeef
Merge branch 'qat' of https://github.com/fedspendingtransparency/usas…
loreleitrimberger Dec 17, 2025
9bfade2
Merge branch 'qat' into ftr/dev-14093-trigger-emr-job
sethstoudenmier Dec 17, 2025
e3ff3af
[DEV-13912] sort everything, readd unsorted columns
loreleitrimberger Dec 17, 2025
c22f957
Merge pull request #4558 from fedspendingtransparency/ftr/dev-14093-t…
sethstoudenmier Dec 17, 2025
ecedc0a
[DEV-13912] fix check for columns
loreleitrimberger Dec 18, 2025
8c3bb8e
[DEV-13912] re-add sorted_json_request]
loreleitrimberger Dec 18, 2025
8f5d8d7
Merge branch 'qat' into ftr/dev-13912-fix-download-cache
loreleitrimberger Dec 18, 2025
bf527b0
Merge pull request #4564 from fedspendingtransparency/ftr/dev-13912-f…
zachflanders-frb Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@ FROM python:3.10.12-slim-bullseye

WORKDIR /dockermount

##### Install postgres 16
RUN apt-get update && apt-get install -y wget gnupg lsb-release \
&& wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - \
&& echo "deb http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" \
> /etc/apt/sources.list.d/pgdg.list \
&& apt-get update \
&& apt-get install -y postgresql-16


RUN apt update && apt install -y \
curl \
gcc \
libpq-dev \
postgresql-13
libpq-dev

##### Copy python packaged
COPY . /dockermount

RUN python3 -m pip install -r requirements/requirements.txt && \
python3 -m pip install -r requirements/requirements-server.txt && \
python3 -m pip install ansible==2.9.15 awscli==1.34.19

##### Ensure Python STDOUT gets sent to container logs
# Ensure Python STDOUT gets sent to container logs
ENV PYTHONUNBUFFERED=1
12 changes: 12 additions & 0 deletions Dockerfile.duckdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Dockerfile for downloads using DuckDB

FROM usaspending-backend:latest

ENV DUCKDB_VERSION=1.4.3

# Install DuckDB extensions
RUN mkdir -p /root/.duckdb/extensions/v$DUCKDB_VERSION/linux_amd64 && \
curl http://extensions.duckdb.org/v$DUCKDB_VERSION/linux_amd64/delta.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v$DUCKDB_VERSION/linux_amd64/delta.duckdb_extension && \
curl http://extensions.duckdb.org/v$DUCKDB_VERSION/linux_amd64/aws.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v$DUCKDB_VERSION/linux_amd64/aws.duckdb_extension && \
curl http://extensions.duckdb.org/v$DUCKDB_VERSION/linux_amd64/httpfs.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v$DUCKDB_VERSION/linux_amd64/httpfs.duckdb_extension && \
curl http://extensions.duckdb.org/v$DUCKDB_VERSION/linux_amd64/postgres_scanner.duckdb_extension.gz | gunzip > /root/.duckdb/extensions/v$DUCKDB_VERSION/linux_amd64/postgres_scanner.duckdb_extension
22 changes: 12 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ services:
- usaspending # must pass --profile usaspending to docker compose for this to come up
- test
- ci
image: docker.elastic.co/elasticsearch/elasticsearch:7.1.1
image: opensearchproject/opensearch:2.9.0
container_name: usaspending-es
environment:
- node.name=usaspending-es
Expand All @@ -177,35 +177,36 @@ services:
- network.host=0.0.0.0
- transport.host=localhost
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms2048m -Xmx2048m" # Ensure Docker is allocated plenty of memory, otherwise this will fail
- plugins.security.disabled=true
- "OPENSEARCH_JAVA_OPTS=-Xms2g -Xmx2g" # Ensure Docker is allocated plenty of memory, otherwise this will fail
# Inject plugin install, then resume with orignial entrypoint command
command: >
/bin/sh -c "
if [ ! -d /usr/share/elasticsearch/plugins/mapper-murmur3 ]; then
# Certificate problem workaround when on VPN - wget without checking cert, then install from local filesystem
wget --no-check-certificate https://artifacts.elastic.co/downloads/elasticsearch-plugins/mapper-murmur3/mapper-murmur3-7.1.1.zip
./bin/elasticsearch-plugin install file:///usr/share/elasticsearch/mapper-murmur3-7.1.1.zip
if [ ! -d /usr/share/opensearch/plugins/mapper-murmur3 ]; then
/usr/share/opensearch/bin/opensearch-plugin install mapper-murmur3
fi
/usr/local/bin/docker-entrypoint.sh"
/usr/share/opensearch/opensearch-docker-entrypoint.sh"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- type: volume
source: local_es_data
target: /usr/share/elasticsearch/data
target: /usr/share/opensearch/data
ports:
- 9200:9200

usaspending-kibana-es:
profiles:
- usaspending # must pass --profile usaspending to docker compose for this to come up
image: docker.elastic.co/kibana/kibana-oss:7.1.1
image: opensearchproject/opensearch-dashboards:2.9.0
container_name: usaspending-kibana-es
# ELASTICSEARCH_HOSTS should match the port for "usaspending-es"; value will need to be updated if using Windows
environment:
- ELASTICSEARCH_HOSTS="http://docker.for.mac.localhost:9200"
- OPENSEARCH_HOSTS="http://usaspending-es:9200"
- DISABLE_SECURITY_DASHBOARDS_PLUGIN=true
- DISABLE_SECURITY_PLUGIN=true
ports:
- 5601:5601

Expand Down Expand Up @@ -351,6 +352,7 @@ services:
# make docker-compose-run profiles="--profile spark" args="--rm -e MINIO_HOST=minio -e JDBC_URL -e COMPONENT_NAME='My Spark Prototype Script' spark-submit \
# --packages org.postgresql:postgresql:42.2.23,io.delta:delta-core_2.12:1.2.1,org.apache.hadoop:hadoop-aws:3.3.1,org.apache.spark:spark-hive_2.12:3.2.1 \
# /project/usaspending_api/etl/tests/path_to_your_spark_prototype_script.py"

spark-submit:
profiles:
- spark # must pass --profile spark to docker compose for this to come up
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements-app.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ djangorestframework==3.15.*
docutils==0.20.1
drf-api-tracking==1.8.4
drf-extensions==0.7.*
duckdb==1.4.3 # Pinned because DuckDB extensions have to be manually installed for each specific version
elasticsearch-dsl==7.4.*
elasticsearch==7.10.*
et-xmlfile==1.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This endpoint returns federal budgetary resources by fiscal year and fiscal peri
+ `fiscal_year` (optional, number)
The fiscal year to retrieve, 2017 or later.
+ `fiscal_period` (optional, number)
The fiscal period. If this optional parameter is provided then `fiscal_year` is a required parameter. If `fiscal_period` is provided without `fiscal_year`, a 400 error is returned. Valid values: 2-12 (2 = November ... 12 = September). For retrieving quarterly data, provide the period which equals 'quarter * 3' (e.g. Q2 = P6). If neither paramater is provided, the entire available history will be returned.
The fiscal period. If this optional parameter is provided then `fiscal_year` is a required parameter. If `fiscal_period` is provided without `fiscal_year`, a 400 error is returned. Valid values: 2-12 (2 = November ... 12 = September). For retrieving quarterly data, provide the period which equals 'quarter * 3' (e.g. Q2 = P6). If neither parameter is provided, the entire available history will be returned.

+ Response 200 (application/json)

Expand Down
201 changes: 181 additions & 20 deletions usaspending_api/common/etl/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@

import logging
import math
import os
import shutil
import time
from collections import namedtuple
from itertools import chain
from typing import List

import duckdb
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from duckdb.experimental.spark.sql.dataframe import DataFrame as DuckDBDataFrame
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, concat, concat_ws, expr, lit, regexp_replace, to_date, transform, when
from pyspark.sql.types import ArrayType, DecimalType, StringType, StructType
Expand Down Expand Up @@ -48,6 +53,7 @@
ZipsGrouped,
)
from usaspending_api.reporting.models import ReportingAgencyMissingTas, ReportingAgencyOverview
from usaspending_api.settings import CSV_LOCAL_PATH, IS_LOCAL, USASPENDING_AWS_REGION
from usaspending_api.submissions.models import DABSSubmissionWindowSchedule, SubmissionAttributes

MAX_PARTITIONS = CONFIG.SPARK_MAX_PARTITIONS
Expand Down Expand Up @@ -555,31 +561,123 @@ def _generate_global_view_sql_strings(tables: List[str], jdbc_url: str) -> List[
return sql_strings


def create_ref_temp_views(spark: SparkSession, create_broker_views: bool = False):
def create_ref_temp_views(spark: SparkSession | DuckDBSparkSession, create_broker_views: bool = False):
"""Create global temporary Spark reference views that sit atop remote PostgreSQL RDS tables
Setting create_broker_views to True will create views for all tables list in _BROKER_REF_TABLES
Note: They will all be listed under global_temp.{table_name}

Args:
spark (SparkSession | DuckDBSparkSession): Spark session
create_broker_views (bool): Should the temporary views, using the Broker tables, be created
Default: False
"""

# Create USAS temp views
rds_ref_tables = build_ref_table_name_list()
rds_sql_strings = _generate_global_view_sql_strings(
tables=rds_ref_tables,
jdbc_url=get_usas_jdbc_url(),
)
logger.info(f"Creating the following tables under the global_temp database: {rds_ref_tables}")
for sql_statement in rds_sql_strings:
spark.sql(sql_statement)

# Create Broker temp views
if create_broker_views:
broker_sql_strings = _generate_global_view_sql_strings(
tables=_BROKER_REF_TABLES,
jdbc_url=get_broker_jdbc_url(),
)
logger.info(f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}")
for sql_statement in broker_sql_strings:
spark.sql(sql_statement)

match isinstance(spark, DuckDBSparkSession):
case True:
logger.info("Creating ref temp views using DuckDB")

if IS_LOCAL:
spark.sql(
f"""
CREATE OR REPLACE SECRET (
TYPE s3,
PROVIDER config,
KEY_ID '{CONFIG.AWS_ACCESS_KEY}',
SECRET '{CONFIG.AWS_SECRET_KEY}',
ENDPOINT '{CONFIG.AWS_S3_ENDPOINT}',
URL_STYLE 'path',
USE_SSL 'false'
);
"""
)
else:
# DuckDB will prepend the HTTP or HTTPS so we need to strip it from the AWS endpoint URL
endpoint_url = CONFIG.AWS_S3_ENDPOINT.replace("http://", "").replace("https://", "")
spark.sql(
f"""
CREATE OR REPLACE SECRET (
TYPE s3,
REGION '{USASPENDING_AWS_REGION}',
ENDPOINT '{endpoint_url}',
PROVIDER 'credential_chain'
);
"""
)

_download_delta_tables = [
{"schema": "rpt", "table_name": "account_balances_download"},
{"schema": "rpt", "table_name": "object_class_program_activity_download"},
]

# The DuckDB Delta extension is needed to interact with DeltaLake tables
spark.sql("LOAD delta; CREATE SCHEMA IF NOT EXISTS rpt;")
for table in _download_delta_tables:
s3_path = (
f"s3://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/{table['schema']}/{table['table_name']}"
)
try:
spark.sql(
f"""
CREATE OR REPLACE TABLE {table["schema"]}.{table["table_name"]} AS
SELECT * FROM delta_scan('{s3_path}');
"""
)
logger.info(f"Successfully created table {table['schema']}.{table['table_name']}")
except duckdb.IOException:
logger.exception(f"Failed to create table {table['table_name']}")
raise RuntimeError(f"Failed to create table {table['table_name']}")

# The DuckDB Postgres extension is needed to connect to the USAS Postgres DB
spark.sql("LOAD postgres; CREATE SCHEMA IF NOT EXISTS global_temp;")
spark.sql(f"ATTACH '{CONFIG.DATABASE_URL}' AS usas (TYPE postgres, READ_ONLY);")

for table in rds_ref_tables:
try:
spark.sql(f"CREATE OR REPLACE VIEW global_temp.{table} AS SELECT * FROM usas.public.{table};")
except duckdb.CatalogException:
logger.exception(f"Failed to create view {table} for {table}")
raise RuntimeError(f"Failed to create view {table} for {table}")

if create_broker_views:
spark.sql(
f"""
ATTACH '{CONFIG.BROKER_DB}' AS broker (TYPE postgres, READ_ONLY);
"""
)
logger.info(
f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}"
)
for table in _BROKER_REF_TABLES:
try:
spark.sql(f"CREATE OR REPLACE VIEW global_temp.{table} AS SELECT * FROM broker.public.{table};")
except duckdb.CatalogException:
logger.exception(f"Failed to create view {table} for {table}")
raise RuntimeError(f"Failed to create view {table} for {table}")
case False:
logger.info("Creating ref temp views using Spark")

rds_sql_strings = _generate_global_view_sql_strings(
tables=rds_ref_tables,
jdbc_url=get_usas_jdbc_url(),
)

for sql_statement in rds_sql_strings:
spark.sql(sql_statement)

if create_broker_views:
broker_sql_strings = _generate_global_view_sql_strings(
tables=_BROKER_REF_TABLES,
jdbc_url=get_broker_jdbc_url(),
)
logger.info(
f"Creating the following Broker tables under the global_temp database: {_BROKER_REF_TABLES}"
)
for sql_statement in broker_sql_strings:
spark.sql(sql_statement)

logger.info("Created the reference views in the global_temp database")

Expand All @@ -595,9 +693,10 @@ def write_csv_file(
) -> int:
"""Write DataFrame data to CSV file parts.
Args:
spark: passed-in active SparkSession
df: the DataFrame wrapping the data source to be dumped to CSV.
parts_dir: Path to dir that will contain the outputted parts files from partitions
spark: Passed-in active SparkSession
df: The DataFrame wrapping the data source to be dumped to CSV.
parts_dir: Path to dir that will contain the outputted parts files from partitions
num_partitions: Indicates the number of partitions to use when writing the Dataframe
overwrite: Whether to replace the file CSV files if they already exist by that name
max_records_per_file: Suggestion to Spark of how many records to put in each written CSV file part,
if it will end up writing multiple files.
Expand Down Expand Up @@ -635,6 +734,68 @@ def write_csv_file(
return df_record_count


def write_csv_file_duckdb(
df: DuckDBDataFrame,
download_file_name: str,
temp_csv_directory_path: str = CSV_LOCAL_PATH,
max_records_per_file: int = EXCEL_ROW_LIMIT,
logger: logging.Logger | None = None,
delimiter: str = ",",
) -> tuple[int, list[str] | list]:
"""Write DataFrame data to CSV file parts.
Args:
df: The DataFrame wrapping the data source to be dumped to CSV.
download_file_name: Name of the download being generated.
temp_csv_directory_path: Directory that will contain the individual CSV files before zipping.
Defaults to CSV_LOCAL_PATH
max_records_per_file: Max number of records to put in each written CSV file.
Defaults to EXCEL_ROW_LIMIT
logger: Logging instance to use.
Defaults to None
delimiter: Charactor used to separate columns in the CSV
Defaults to ","
Returns:
record count of the DataFrame that was used to populate the CSV file(s)
list of full path(s) to the temp CSV file(s)
"""
start = time.time()
_pandas_df = df.toPandas()
_pandas_df["file_number"] = (_pandas_df.index // max_records_per_file) + 1
df_record_count = len(_pandas_df)
rel = duckdb.from_df(_pandas_df)

full_file_paths = []

logger.info(f"Writing source data DataFrame to csv files for file {download_file_name}")
rel.to_csv(
file_name=f"{temp_csv_directory_path}{download_file_name}",
sep=delimiter,
escapechar='"',
header=True,
partition_by=["file_number"],
write_partition_columns=False, # Don't include the columns that are used for partitioning in the CSV
overwrite=True,
)

# Move and rename the CSV files to match the expected format
_partition_dirs = [
f"{temp_csv_directory_path}{download_file_name}/{d}"
for d in os.listdir(f"{temp_csv_directory_path}{download_file_name}")
]
for dir in _partition_dirs:
_old_csv_path = f"{dir}/{os.listdir(dir)[0]}"
_new_csv_path = (
f"{temp_csv_directory_path}{download_file_name}/{download_file_name}_{dir.split('=')[1].zfill(2)}.csv"
)
shutil.move(_old_csv_path, _new_csv_path)
full_file_paths.append(_new_csv_path)
os.rmdir(dir)

logger.info(f"{temp_csv_directory_path}{download_file_name} contains {df_record_count:,} rows of data")
logger.info(f"Wrote source data DataFrame to {len(full_file_paths)} CSV files in {(time.time() - start):3f}s")
return df_record_count, full_file_paths


def _merge_file_parts(fs, out_stream, conf, hadoop, partial_merged_file_path, part_file_list):
"""Read-in files in alphabetical order and append them one by one to the merged file"""

Expand Down
Loading