-
Notifications
You must be signed in to change notification settings - Fork 153
[DEV-14237] Create execute_spark_sql Command #4569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
baa09ca
[DEV-14237] Adds new command to run Spark SQL commands. Updates docke…
collinwr 42cee43
[DEV-14237] Reformats with black
collinwr 6e27ef6
[DEV-14237] Adds placeholder files for unit/integration tests. Splits…
collinwr 3094351
Updates --sql argument to accept multiple statements. Adds unit/integ…
collinwr b252dcc
Reorders imports. Extra test to ensure sql executes
collinwr 0e6ea5f
Merge branch 'qat' into ftr/dev-14237-execute-spark-sql
collinwr 77e94f5
Adds type hints to helper function
collinwr 6fd7500
Replaces custom sql split function with that from sqlparse
collinwr 14fd170
Fixes merge conflicts by removing helper method replaced by sqlparse …
collinwr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
124 changes: 124 additions & 0 deletions
124
usaspending_api/etl/management/commands/execute_spark_sql.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| import logging | ||
| import sqlparse | ||
|
|
||
| from django.core.management.base import BaseCommand, CommandError | ||
| from pyspark.sql import SparkSession | ||
|
|
||
| from usaspending_api.common.etl.spark import create_ref_temp_views | ||
| from usaspending_api.common.helpers.spark_helpers import ( | ||
| configure_spark_session, | ||
| get_active_spark_session, | ||
| ) | ||
| from usaspending_api.common.retrieve_file_from_uri import RetrieveFileFromUri | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = """ | ||
| This command executes Spark SQL commands from either a file or a string. | ||
| The resulting dataframe will be printed to standard out using the df.show() method | ||
| """ | ||
|
|
||
| # Values defined in the handler | ||
| spark: SparkSession | ||
|
|
||
| def add_arguments(self, parser): | ||
|
|
||
| parser.add_argument( | ||
| "--sql", | ||
| type=str, | ||
| required=False, | ||
| help="A string containing semicolon-separated Spark SQL statements.", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "--file", | ||
| type=str, | ||
| required=False, | ||
| help="Path to file containing semicolon-separated SQL statements. Can be a local file path or S3/HTTP url", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "--create-temp-views", | ||
| action="store_true", | ||
| required=False, | ||
| help="Controls whether all (USAs and Broker) temp views will be created before sql execution", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "--result-limit", | ||
| type=int, | ||
| required=False, | ||
| default=20, | ||
| help="Maximum number of result records to display from Pyspark dataframe.", | ||
| ) | ||
|
|
||
| parser.add_argument( | ||
| "--dry-run", | ||
| action="store_true", | ||
| required=False, | ||
| help="Print SQL statements without executing", | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| # Resolve Parameters | ||
| sql_input = options.get("sql") | ||
| file_path = options.get("file") | ||
| create_temp_views = options.get("create_temp_views") | ||
| result_limit = options.get("result_limit") | ||
| dry_run = options.get("dry_run") | ||
|
|
||
| extra_conf = { | ||
| # Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore | ||
| "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", | ||
| "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
| # See comment below about old date and time values cannot parsed without these | ||
| "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY", # for dates at/before 1900 | ||
| "spark.sql.legacy.parquet.int96RebaseModeInWrite": "LEGACY", # for timestamps at/before 1900 | ||
| "spark.sql.jsonGenerator.ignoreNullFields": "false", # keep nulls in our json | ||
| } | ||
|
|
||
| # Prepare SQL Statements from either provided string or file | ||
| if file_path and sql_input: | ||
| raise CommandError("Cannot use both --sql and --file. Choose one.") | ||
| elif file_path: | ||
| with RetrieveFileFromUri(file_path).get_file_object(text=True) as f: | ||
| sql_statement_string = f.read() | ||
| elif sql_input: | ||
| sql_statement_string = sql_input | ||
| else: | ||
| raise CommandError("Either --sql or --file must be provided") | ||
|
|
||
| sql_statements = [query.strip() for query in sqlparse.split(sql_statement_string) if query.strip()] | ||
|
|
||
| logger.info(f"Found {len(sql_statements)} SQL statement(s)") | ||
|
|
||
| # Prepare Spark Session after variables parameters have been resolved and | ||
| # SQL statements have been identified | ||
| self.spark = get_active_spark_session() | ||
| spark_created_by_command = False | ||
| if not self.spark: | ||
| spark_created_by_command = True | ||
| self.spark = configure_spark_session(**extra_conf, spark_context=self.spark) # type: SparkSession | ||
|
|
||
| if create_temp_views: | ||
| create_ref_temp_views(self.spark, create_broker_views=True) | ||
|
|
||
| # Execute SQL Statements | ||
| for idx, statement in enumerate(sql_statements, 1): | ||
| logger.info(f"--- Statement {idx} ---") | ||
| logger.info(statement) | ||
|
|
||
| if dry_run: | ||
| logger.info("[DRY RUN - Not executed]") | ||
| else: | ||
| try: | ||
| df = self.spark.sql(statement) | ||
| df.show(result_limit) | ||
| logger.info("Executed successfully") | ||
| except Exception as e: | ||
| logger.info(f"Error: {e}") | ||
|
|
||
| if spark_created_by_command: | ||
| self.spark.stop() |
Empty file.
3 changes: 3 additions & 0 deletions
3
usaspending_api/etl/tests/data/test_execute_spark_files/two_queries_with_blank_line.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| SHOW DATABASES; | ||
|
|
||
| SELECT 1; |
65 changes: 65 additions & 0 deletions
65
usaspending_api/etl/tests/integration/test_execute_spark_sql.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| from pytest import raises | ||
|
|
||
| from unittest.mock import patch, MagicMock | ||
|
|
||
| from django.core.management import call_command | ||
| from django.core.management.base import CommandError | ||
|
|
||
|
|
||
| # Argument Validation | ||
|
|
||
|
|
||
| def test_both_sql_and_file_raises_error(): | ||
| with raises( | ||
| CommandError, | ||
| match="Cannot use both --sql and --file. Choose one.", | ||
| ): | ||
| call_command("execute_spark_sql", "--sql", "SHOW DATABASES;", "--file", "test.sql") | ||
|
|
||
|
|
||
| def test_no_sql_and_file_raises_error(): | ||
| with raises( | ||
| CommandError, | ||
| match="Either --sql or --file must be provided", | ||
| ): | ||
| call_command("execute_spark_sql") | ||
|
|
||
|
|
||
| # Integration Tests | ||
|
|
||
|
|
||
| def test_sql_functionality(): | ||
| """Tests a variety of cases in a single test for Spark performance""" | ||
|
|
||
| # Test --sql approach | ||
| with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger: | ||
| call_command("execute_spark_sql", "--sql", "SHOW DATABASES; SELECT 1;") | ||
| mock_logger.info.assert_any_call("Found 2 SQL statement(s)") | ||
|
|
||
| # Test --file approach with two SQL statements and a blank line | ||
| test_file_path = "usaspending_api/etl/tests/data/test_execute_spark_files/two_queries_with_blank_line.sql" | ||
| with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger: | ||
| call_command("execute_spark_sql", "--file", test_file_path) | ||
| mock_logger.info.assert_any_call("Found 2 SQL statement(s)") | ||
|
|
||
| # Test --file approach with an empty file | ||
| test_file_path = "usaspending_api/etl/tests/data/test_execute_spark_files/empty_file.sql" | ||
| with patch("usaspending_api.etl.management.commands.execute_spark_sql.logger") as mock_logger: | ||
| call_command("execute_spark_sql", "--file", test_file_path) | ||
| mock_logger.info.assert_any_call("Found 0 SQL statement(s)") | ||
|
|
||
| # Test SQL executes | ||
| mock_spark = MagicMock() | ||
| with patch( | ||
| "usaspending_api.etl.management.commands.execute_spark_sql.get_active_spark_session", return_value=mock_spark | ||
| ): | ||
| call_command("execute_spark_sql", "--sql", "SELECT 1;") | ||
| mock_spark.sql.assert_called() | ||
|
|
||
| # Test --dry-run does not execute SQL | ||
| mock_spark = MagicMock() | ||
| with patch( | ||
| "usaspending_api.etl.management.commands.execute_spark_sql.get_active_spark_session", return_value=mock_spark | ||
| ): | ||
| call_command("execute_spark_sql", "--sql", "SELECT 1;", "--dry-run") | ||
| mock_spark.sql.assert_not_called() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is necessary to include in the
docker-compose.ymlsince the default assumption is that thedocker-compose.ymlfile is located in the project root directory.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The need for this arises when starting tests or Spark commands from a dev container. The problem is that the volume source path refers to the base OS (in our case, Windows) filesystem, not from the dev container. In my case, I need to override the
.with the full Windows path to our project directory.