From da7f82e5a8ed669702f06fefddf854d714ed1e76 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 13 Nov 2023 10:34:34 -0800 Subject: [PATCH] Revert "Minor Refactoring (#2308) (#2317)" This reverts commit 051cc4fc42fdf106780573f613d080bc1d9a7030. --- spark/src/main/antlr/SqlBaseParser.g4 | 2 +- .../sql/spark/client/StartJobRequest.java | 2 - .../dispatcher/SparkQueryDispatcherTest.java | 329 +++++++++--------- 3 files changed, 175 insertions(+), 158 deletions(-) diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 77a9108e06..6a6d39e96c 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -967,6 +967,7 @@ primaryExpression | qualifiedName DOT ASTERISK #star | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor | LEFT_PAREN query RIGHT_PAREN #subqueryExpression + | IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN #identifierClause | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument (COMMA argument+=functionArgument)*)? RIGHT_PAREN (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? @@ -1195,7 +1196,6 @@ qualifiedNameList functionName : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN - | identFunc=IDENTIFIER_KW // IDENTIFIER itself is also a valid function name. | qualifiedName | FILTER | LEFT diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index f57c8facee..c4382239a1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -7,14 +7,12 @@ import java.util.Map; import lombok.Data; -import lombok.EqualsAndHashCode; /** * This POJO carries all the fields required for emr serverless job submission. Used as model in * {@link EMRServerlessClient} interface. */ @Data -@EqualsAndHashCode public class StartJobRequest { public static final Long DEFAULT_JOB_TIMEOUT = 120L; diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 8c0ecb2ea2..ab9761da36 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -41,8 +41,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -80,8 +78,6 @@ public class SparkQueryDispatcherTest { private SparkQueryDispatcher sparkQueryDispatcher; - @Captor ArgumentCaptor startJobRequestArgumentCaptor; - @BeforeEach void setUp() { sparkQueryDispatcher = @@ -100,21 +96,19 @@ void testDispatchSelectQuery() { tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags, false, any()))) @@ -131,18 +125,23 @@ void testDispatchSelectQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:non-index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), + tags, + false, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -154,22 +153,20 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString( - "basicauth", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); - put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); - } - }); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + constructExpectedSparkSubmitParameterString( + "basicauth", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); + put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); + } + }), tags, false, any()))) @@ -186,18 +183,24 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:non-index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "basicauth", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); + put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); + } + }), + tags, + false, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -209,20 +212,18 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString( - "noauth", - new HashMap<>() { - { - } - }); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + constructExpectedSparkSubmitParameterString( + "noauth", + new HashMap<>() { + { + } + }), tags, false, any()))) @@ -239,18 +240,22 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:non-index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "noauth", + new HashMap<>() { + { + } + }), + tags, + false, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -267,22 +272,20 @@ void testDispatchIndexQuery() { String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; - String sparkSubmitParameters = - withStructuredStreaming( - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), tags, true, any()))) @@ -299,18 +302,24 @@ void testDispatchIndexQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - true, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -323,21 +332,19 @@ void testDispatchWithPPLQuery() { tags.put("cluster", TEST_CLUSTER_NAME); String query = "source = my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags, false, any()))) @@ -354,18 +361,23 @@ void testDispatchWithPPLQuery() { LangType.PPL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:non-index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), + tags, + false, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -378,21 +390,19 @@ void testDispatchQueryWithoutATableAndDataSourceName() { tags.put("cluster", TEST_CLUSTER_NAME); String query = "show tables"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - }); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags, false, any()))) @@ -409,18 +419,23 @@ void testDispatchQueryWithoutATableAndDataSourceName() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:non-index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), + tags, + false, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -438,22 +453,20 @@ void testDispatchIndexQueryWithoutADatasourceName() { String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; - String sparkSubmitParameters = - withStructuredStreaming( - constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })); when(emrServerlessClient.startJobRun( new StartJobRequest( query, "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - sparkSubmitParameters, + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), tags, true, any()))) @@ -470,18 +483,24 @@ void testDispatchIndexQueryWithoutADatasourceName() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - StartJobRequest expected = - new StartJobRequest( - query, - "TEST_CLUSTER:index-query", - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - true, - null); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true, + any())); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); verifyNoInteractions(flintIndexMetadataReader); @@ -886,8 +905,8 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog " - + " --conf spark.flint.datasource.name=my_glue " - + authParamConfigBuilder; + + authParamConfigBuilder + + " --conf spark.flint.datasource.name=my_glue "; } private String withStructuredStreaming(String parameters) {