Skip to content
7 changes: 5 additions & 2 deletions usaspending_api/common/spark/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.conf import settings
from django.core.management import call_command
from duckdb.experimental.spark.sql import SparkSession as DuckDBSparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.configs import LOCAL_EXTENDED_EXTRA_CONF, OPTIONAL_SPARK_HIVE_JAR, SPARK_SESSION_JARS

Expand Down Expand Up @@ -165,10 +166,12 @@ def handle_start(self, job_name: str, command_name: str, command_options: list[s
mode="BATCH",
jobDriver={
"sparkSubmit": {
"entryPoint": command_name,
"entryPointArguments": command_options,
"entryPoint": f"s3://{CONFIG.SPARK_S3_BUCKET}/master/manage.py",
"entryPointArguments": [command_name, *command_options],
}
},
# TODO: Requires updating to EMR 7
# retryPolicy={"maxAttempts": 2},
)
return response

Expand Down
7 changes: 1 addition & 6 deletions usaspending_api/download/delta_downloads/account_balances.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -29,11 +28,7 @@ class AccountBalancesMixin:

@property
def download_table(self) -> DataFrame:
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.account_balances_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
)
return self.spark.table("rpt.account_balances_download")

def _build_dataframes(self) -> list[DataFrame]:
return [
Expand Down
9 changes: 2 additions & 7 deletions usaspending_api/download/delta_downloads/award_financial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@


from pyspark.sql import functions as sf, Column, DataFrame, SparkSession
from usaspending_api.config import CONFIG

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
Expand Down Expand Up @@ -31,11 +30,7 @@ class AwardFinancialMixin:

@property
def download_table(self) -> DataFrame:
# TODO: This should be reverted back after Spark downloads are migrated to EMR
# return self.spark.table("rpt.award_financial_download")
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
)
return self.spark.table("rpt.award_financial_download")

@property
def non_zero_filters(self) -> Column:
Expand All @@ -49,7 +44,7 @@ def non_zero_filters(self) -> Column:
@property
def award_categories(self) -> dict[str, Column]:
return {
"Assistance": (sf.isnotnull(sf.col("is_fpds")) & ~sf.col("is_fpds")),
"Assistance": (~sf.isnull(sf.col("is_fpds")) & ~sf.col("is_fpds")),
"Contracts": sf.col("is_fpds"),
"Unlinked": sf.isnull(sf.col("is_fpds")),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pyspark.sql import Column, DataFrame, SparkSession

from usaspending_api.common.spark.utils import collect_concat, filter_submission_and_sum
from usaspending_api.config import CONFIG
from usaspending_api.download.delta_downloads.abstract_downloads.account_download import (
AbstractAccountDownload,
AccountLevel,
Expand Down Expand Up @@ -42,13 +41,7 @@ def __init__(self, *args, **kwargs):

@property
def download_table(self) -> DataFrame | DuckDBSparkDataFrame:
if isinstance(self.spark, DuckDBSparkSession):
return self.spark.table("rpt.object_class_program_activity_download")
else:
# TODO: This should be reverted back after Spark downloads are migrated to EMR
return self.spark.read.format("delta").load(
f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
)
return self.spark.table("rpt.object_class_program_activity_download")

def _build_dataframes(self) -> list[DataFrame | DuckDBSparkDataFrame]:
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import traceback
from typing import Callable

import boto3

# Third-party library imports
from opentelemetry.trace import SpanKind, Status, StatusCode

Expand All @@ -14,7 +16,7 @@

# Application imports
from usaspending_api.common.logging import configure_logging
from usaspending_api.common.spark.jobs import SparkJobs, LocalStrategy, DatabricksStrategy
from usaspending_api.common.spark.jobs import SparkJobs, LocalStrategy, EmrServerlessStrategy
from usaspending_api.common.sqs.sqs_handler import DownloadLogic, get_sqs_queue
from usaspending_api.common.sqs.sqs_job_logging import log_job_message
from usaspending_api.common.sqs.sqs_work_dispatcher import (
Expand Down Expand Up @@ -165,9 +167,23 @@ def _run_spark_download(download_job_id: int, job_name: str) -> None:
command_options = [f"--skip-local-cleanup"]
extra_options = {"run_as_container": True}
else:
strategy = DatabricksStrategy()
strategy = EmrServerlessStrategy()
command_options = []
extra_options = {}

ssm_client = boto3.client("ssm", settings.USASPENDING_AWS_REGION)
param_resp = ssm_client.get_parameters(
Names=[settings.EMR_DOWNLOAD_APP_PARAM_NAME, settings.EMR_DOWNLOAD_ROLE_PARAM_NAME], WithDecryption=True
)
if param_resp.get("InvalidParameters"):
logger.error(f"Invalid parameters: {param_resp['InvalidParameters']}")
raise ValueError("Invalid parameters")
param_values = {param["Name"]: param["Value"] for param in param_resp["Parameters"]}

extra_options = {
"application_id": param_values[settings.EMR_DOWNLOAD_APP_PARAM_NAME],
"execution_role_arn": param_values[settings.EMR_DOWNLOAD_ROLE_PARAM_NAME],
}

spark_jobs = SparkJobs(strategy)
spark_jobs.start(
job_name=job_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
from django.core.management import call_command
from model_bakery import baker
from usaspending_api.config import CONFIG

from usaspending_api.common.etl.spark import create_ref_temp_views
from usaspending_api.download.delta_downloads.account_balances import AccountBalancesDownloadFactory
Expand All @@ -23,18 +22,12 @@


@pytest.fixture(scope="function")
def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch):
def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
call_command(
"create_delta_table",
"--destination-table=award_financial_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
),
)
column_placeholders = {field.name: [None] * 5 for field in award_financial_schema}
test_data_df = pd.DataFrame(
data={
Expand Down Expand Up @@ -75,18 +68,12 @@ def award_financial_table(spark, s3_unittest_data_bucket, hive_unittest_metastor


@pytest.fixture(scope="function")
def award_financial_table_award_category(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch):
def award_financial_table_award_category(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
call_command(
"create_delta_table",
"--destination-table=award_financial_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
),
)
column_placeholders = {field.name: [None] * 5 for field in award_financial_schema}
test_data_df = pd.DataFrame(
data={
Expand Down Expand Up @@ -127,18 +114,12 @@ def award_financial_table_award_category(spark, s3_unittest_data_bucket, hive_un


@pytest.fixture(scope="function")
def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch):
def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
call_command(
"create_delta_table",
"--destination-table=account_balances_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.account_balances.AccountBalancesMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
),
)
column_placeholders = {field.name: [None] * 5 for field in account_balances_schema}
test_data_df = pd.DataFrame(
data={
Expand Down Expand Up @@ -175,20 +156,12 @@ def account_balances_download_table(spark, s3_unittest_data_bucket, hive_unittes


@pytest.fixture(scope="function")
def object_class_by_program_activity_download_table(
spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch
):
def object_class_by_program_activity_download_table(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
call_command(
"create_delta_table",
"--destination-table=object_class_program_activity_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.object_class_program_activity.ObjectClassProgramActivityMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
),
)
column_placeholders = {field.name: [None] * 5 for field in object_class_program_activity_schema}
test_data_df = pd.DataFrame(
data={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from django.core.management import call_command
from model_bakery import baker
from rest_framework import status
from usaspending_api.config import CONFIG

from usaspending_api.accounts.models import FederalAccount, TreasuryAppropriationAccount
from usaspending_api.awards.models import FinancialAccountsByAwards
Expand All @@ -23,42 +22,24 @@


@pytest.fixture
def create_download_delta_tables(spark, s3_unittest_data_bucket, hive_unittest_metastore_db, monkeypatch):
def create_download_delta_tables(spark, s3_unittest_data_bucket, hive_unittest_metastore_db):
call_command(
"create_delta_table",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
f"--destination-table=award_financial_download",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
),
)

call_command(
"create_delta_table",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
f"--destination-table=object_class_program_activity_download",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.object_class_program_activity.ObjectClassProgramActivityMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
),
)

call_command(
"create_delta_table",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
f"--destination-table=account_balances_download",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.account_balances.AccountBalancesMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
),
)
yield


Expand Down
22 changes: 0 additions & 22 deletions usaspending_api/etl/tests/integration/test_load_to_from_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from django.conf import settings
from django.core.management import call_command
from django.db import connection, connections, transaction, models
from usaspending_api.config import CONFIG
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
from usaspending_api.etl.award_helpers import update_awards
from usaspending_api.etl.broker_etl_helpers import dictfetchall
Expand Down Expand Up @@ -1053,19 +1052,12 @@ def test_load_object_class_program_activity_class(
s3_unittest_data_bucket,
hive_unittest_metastore_db,
populate_usas_data_and_recipients_from_broker,
monkeypatch,
):
call_command(
"create_delta_table",
"--destination-table=object_class_program_activity_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.object_class_program_activity.ObjectClassProgramActivityMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/object_class_program_activity_download"
),
)

verify_delta_table_loaded_to_delta(
spark,
Expand All @@ -1082,7 +1074,6 @@ def test_load_award_financial_download(
s3_unittest_data_bucket,
populate_usas_data_and_recipients_from_broker,
hive_unittest_metastore_db,
monkeypatch,
):

load_delta_table_from_postgres("published_fabs", s3_unittest_data_bucket)
Expand Down Expand Up @@ -1124,12 +1115,6 @@ def test_load_award_financial_download(
"--destination-table=award_financial_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.award_financial.AwardFinancialMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/award_financial_download"
),
)

expected_data = [
{
Expand Down Expand Up @@ -1237,20 +1222,13 @@ def test_load_account_balances_download(
spark,
s3_unittest_data_bucket,
hive_unittest_metastore_db,
monkeypatch,
populate_usas_data_and_recipients_from_broker,
):
call_command(
"create_delta_table",
"--destination-table=account_balances_download",
f"--spark-s3-bucket={s3_unittest_data_bucket}",
)
monkeypatch.setattr(
f"usaspending_api.download.delta_downloads.account_balances.AccountBalancesMixin.download_table",
spark.read.format("delta").load(
f"s3a://{s3_unittest_data_bucket}/{CONFIG.DELTA_LAKE_S3_PATH}/rpt/account_balances_download"
),
)

verify_delta_table_loaded_to_delta(
spark,
Expand Down
9 changes: 9 additions & 0 deletions usaspending_api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@
BROKER_AGENCY_BUCKET_NAME = ""
UNLINKED_AWARDS_DOWNLOAD_REDIRECT_DIR = "unlinked_awards_downloads"

# AWS parameter store key names
EMR_DOWNLOAD_APP_PARAM_NAME = ""
if not EMR_DOWNLOAD_APP_PARAM_NAME:
EMR_DOWNLOAD_APP_PARAM_NAME = os.environ.get("EMR_DOWNLOAD_APP_PARAM_NAME")

EMR_DOWNLOAD_ROLE_PARAM_NAME = ""
if not EMR_DOWNLOAD_ROLE_PARAM_NAME:
EMR_DOWNLOAD_ROLE_PARAM_NAME = os.environ.get("EMR_DOWNLOAD_ROLE_PARAM_NAME")

# This list contains any abnormal characters in agency names
# This list is important to track which characters we need to replace in
# the agency name before the name can be used in a file name
Expand Down