diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d6463779d6..4fdd8335e1 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -94,7 +94,7 @@ import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.client.EMRServerlessClient; -import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR; +import org.opensearch.sql.spark.client.EmrServerlessClientImpl; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; @@ -325,7 +325,7 @@ private EMRServerlessClient createEMRServerlessClient() { .withRegion(sparkExecutionEngineConfig.getRegion()) .withCredentials(new DefaultAWSCredentialsProviderChain()) .build(); - return new EmrServerlessClientImplEMR(awsemrServerless); + return new EmrServerlessClientImpl(awsemrServerless); }); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java index 8dff8f0ea6..7e64b632ea 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EMRServerlessClient.java @@ -13,7 +13,7 @@ /** * Client Interface for spark Job Submissions. Can have multiple implementations based on the * underlying spark infrastructure. Currently, we have one for EMRServerless {@link - * EmrServerlessClientImplEMR} + * EmrServerlessClientImpl} */ public interface EMRServerlessClient { diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java similarity index 95% rename from spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java rename to spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index f0a7e76c87..335f3b6fc8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -23,12 +23,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class EmrServerlessClientImplEMR implements EMRServerlessClient { +public class EmrServerlessClientImpl implements EMRServerlessClient { private final AWSEMRServerless emrServerless; - private static final Logger logger = LogManager.getLogger(EmrServerlessClientImplEMR.class); + private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class); - public EmrServerlessClientImplEMR(AWSEMRServerless emrServerless) { + public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { this.emrServerless = emrServerless; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index ce608a8c7e..5e1f210d08 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -16,7 +16,7 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { @Override public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) { - String indexName = getIndexName(indexDetails).toLowerCase(); + String indexName = getIndexName(indexDetails); GetMappingsResponse mappingsResponse = client.admin().indices().prepareGetMappings(indexName).get(); try { @@ -34,27 +34,31 @@ public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) { private String getIndexName(IndexDetails indexDetails) { FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) { - return "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + indexDetails.getIndexType().getName(); + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + indexDetails.getIndexType().getSuffix(); + return indexName.toLowerCase(); } else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) { - return "flint" - + "_" - + fullyQualifiedTableName.getDatasourceName() - + "_" - + fullyQualifiedTableName.getSchemaName() - + "_" - + fullyQualifiedTableName.getTableName() - + "_" - + indexDetails.getIndexName() - + "_" - + indexDetails.getIndexType().getName(); + String indexName = + "flint" + + "_" + + fullyQualifiedTableName.getDatasourceName() + + "_" + + fullyQualifiedTableName.getSchemaName() + + "_" + + fullyQualifiedTableName.getTableName() + + "_" + + indexDetails.getIndexName() + + "_" + + indexDetails.getIndexType().getSuffix(); + return indexName.toLowerCase(); } else { throw new UnsupportedOperationException( String.format("Unsupported Index Type : %s", indexDetails.getIndexType())); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java index 1415856803..432370a62f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java @@ -5,36 +5,19 @@ package org.opensearch.sql.spark.flint; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** Enum for FlintIndex Type. */ public enum FlintIndexType { SKIPPING("skipping_index"), - COVERING("covering_index"), + COVERING("index"), MATERIALIZED("materialized_view"); - private final String name; - private static final Map ENUM_MAP; - - FlintIndexType(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } + private final String suffix; - static { - Map map = new HashMap<>(); - for (FlintIndexType instance : FlintIndexType.values()) { - map.put(instance.getName().toLowerCase(), instance); - } - ENUM_MAP = Collections.unmodifiableMap(map); + FlintIndexType(String suffix) { + this.suffix = suffix; } - public static FlintIndexType get(String name) { - return ENUM_MAP.get(name.toLowerCase()); + public String getSuffix() { + return this.suffix; } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 4655584855..f874b351a9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -36,7 +36,7 @@ void testStartJobRun() { StartJobRunResult response = new StartJobRunResult(); when(emrServerless.startJobRun(any())).thenReturn(response); - EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); emrServerlessClient.startJobRun( new StartJobRequest( QUERY, @@ -54,7 +54,7 @@ void testStartJobRunResultIndex() { StartJobRunResult response = new StartJobRunResult(); when(emrServerless.startJobRun(any())).thenReturn(response); - EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); emrServerlessClient.startJobRun( new StartJobRequest( QUERY, @@ -74,7 +74,7 @@ void testGetJobRunState() { GetJobRunResult response = new GetJobRunResult(); response.setJobRun(jobRun); when(emrServerless.getJobRun(any())).thenReturn(response); - EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123"); } @@ -82,7 +82,7 @@ void testGetJobRunState() { void testCancelJobRun() { when(emrServerless.cancelJobRun(any())) .thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID)); - EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); CancelJobRunResult cancelJobRunResult = emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId()); @@ -91,7 +91,7 @@ void testCancelJobRun() { @Test void testCancelJobRunWithValidationException() { doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any()); - EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless); + EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class,