Skip to content
Merged
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ services:
dockerfile: Dockerfile.testing
container_name: usaspending-test
volumes:
- .:/usaspending-api
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is necessary to include in the docker-compose.yml since the default assumption is that the docker-compose.yml file is located in the project root directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The need for this arises when starting tests or Spark commands from a dev container. The problem is that the volume source path refers to the base OS (in our case, Windows) filesystem, not from the dev container. In my case, I need to override the . with the full Windows path to our project directory.

- ${PROJECT_DIRECTORY:-.}:/usaspending-api
# Required to interact with host's docker daemon from within this running container,
# to spin up the data-act-broker-init-test-db container used for broker integration tests (see: conftest.broker_db_setup)
- /var/run/docker.sock:/var/run/docker.sock
Expand Down Expand Up @@ -395,7 +395,7 @@ services:
command: --help
volumes:
- type: bind
source: .
source: ${PROJECT_DIRECTORY:-.}
target: /project
read_only: false
# NOTE: The hive metastore_db Derby database folder is expected to be configured to show up as a subfolder of the spark-warehouse dir
Expand Down
1 change: 1 addition & 0 deletions requirements/requirements-app.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ python-dateutil==2.9.*
python-json-logger==2.0.7
requests==2.31.*
retrying==1.3.4
sqlparse==0.5.*
urllib3==1.26.*

# Opentelemetry Dependencies
Expand Down
124 changes: 124 additions & 0 deletions usaspending_api/etl/management/commands/execute_spark_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import logging
import sqlparse

from django.core.management.base import BaseCommand, CommandError
from pyspark.sql import SparkSession

from usaspending_api.common.etl.spark import create_ref_temp_views
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
get_active_spark_session,
)
from usaspending_api.common.retrieve_file_from_uri import RetrieveFileFromUri

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = """
This command executes Spark SQL commands from either a file or a string.
The resulting dataframe will be printed to standard out using the df.show() method
"""

# Values defined in the handler
spark: SparkSession

def add_arguments(self, parser):

parser.add_argument(
"--sql",
type=str,
required=False,
help="A string containing semicolon-separated Spark SQL statements.",
)

parser.add_argument(
"--file",
type=str,
required=False,
help="Path to file containing semicolon-separated SQL statements. Can be a local file path or S3/HTTP url",
)

parser.add_argument(
"--create-temp-views",
action="store_true",
required=False,
help="Controls whether all (USAs and Broker) temp views will be created before sql execution",
)

parser.add_argument(
"--result-limit",
type=int,
required=False,
default=20,
help="Maximum number of result records to display from Pyspark dataframe.",
)

parser.add_argument(
"--dry-run",
action="store_true",
required=False,
help="Print SQL statements without executing",
)

def handle(self, *args, **options):
# Resolve Parameters
sql_input = options.get("sql")
file_path = options.get("file")
create_temp_views = options.get("create_temp_views")
result_limit = options.get("result_limit")
dry_run = options.get("dry_run")

extra_conf = {
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
# See comment below about old date and time values cannot parsed without these
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY", # for dates at/before 1900
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "LEGACY", # for timestamps at/before 1900
"spark.sql.jsonGenerator.ignoreNullFields": "false", # keep nulls in our json
}

# Prepare SQL Statements from either provided string or file
if file_path and sql_input:
raise CommandError("Cannot use both --sql and --file. Choose one.")
elif file_path:
with RetrieveFileFromUri(file_path).get_file_object(text=True) as f:
sql_statement_string = f.read()
elif sql_input:
sql_statement_string = sql_input
else:
raise CommandError("Either --sql or --file must be provided")

sql_statements = [query.strip() for query in sqlparse.split(sql_statement_string) if query.strip()]

logger.info(f"Found {len(sql_statements)} SQL statement(s)")

# Prepare Spark Session after variables parameters have been resolved and
# SQL statements have been identified
self.spark = get_active_spark_session()
spark_created_by_command = False
if not self.spark:
spark_created_by_command = True
self.spark = configure_spark_session(**extra_conf, spark_context=self.spark) # type: SparkSession

if create_temp_views:
create_ref_temp_views(self.spark, create_broker_views=True)

# Execute SQL Statements
for idx, statement in enumerate(sql_statements, 1):
logger.info(f"--- Statement {idx} ---")
logger.info(statement)

if dry_run:
logger.info("[DRY RUN - Not executed]")
else:
try:
df = self.spark.sql(statement)
df.show(result_limit)
logger.info("Executed successfully")
except Exception as e:
logger.info(f"Error: {e}")

if spark_created_by_command:
self.spark.stop()
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SHOW DATABASES;

SELECT 1;
65 changes: 65 additions & 0 deletions usaspending_api/etl/tests/integration/test_execute_spark_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from pytest import raises

from unittest.mock import patch, MagicMock

from django.core.management import call_command
from django.core.management.base import CommandError


# Argument Validation


def test_both_sql_and_file_raises_error():
with raises(
CommandError,
match="Cannot use both --sql and --file. Choose one.",
):
call_command("execute_spark_sql", "--sql", "SHOW DATABASES;", "--file", "test.sql")


def test_no_sql_and_file_raises_error():
with raises(
CommandError,
match="Either --sql or --file must be provided",
):
call_command("execute_spark_sql")


# Integration Tests


def test_sql_functionality():
"""Tests a variety of cases in a single test for Spark performance"""

# Test --sql approach
with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger:
call_command("execute_spark_sql", "--sql", "SHOW DATABASES; SELECT 1;")
mock_logger.info.assert_any_call("Found 2 SQL statement(s)")

# Test --file approach with two SQL statements and a blank line
test_file_path = "usaspending_api/etl/tests/data/test_execute_spark_files/two_queries_with_blank_line.sql"
with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger:
call_command("execute_spark_sql", "--file", test_file_path)
mock_logger.info.assert_any_call("Found 2 SQL statement(s)")

# Test --file approach with an empty file
test_file_path = "usaspending_api/etl/tests/data/test_execute_spark_files/empty_file.sql"
with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger:
call_command("execute_spark_sql", "--file", test_file_path)
mock_logger.info.assert_any_call("Found 0 SQL statement(s)")

# Test SQL executes
mock_spark = MagicMock()
with patch(
"usaspending_api.etl.management.commands.execute_spark_sql.get_active_spark_session", return_value=mock_spark
):
call_command("execute_spark_sql", "--sql", "SELECT 1;")
mock_spark.sql.assert_called()

# Test --dry-run does not execute SQL
mock_spark = MagicMock()
with patch(
"usaspending_api.etl.management.commands.execute_spark_sql.get_active_spark_session", return_value=mock_spark
):
call_command("execute_spark_sql", "--sql", "SELECT 1;", "--dry-run")
mock_spark.sql.assert_not_called()
Loading