Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Minor fix in dropping covering index #2228

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
Expand Down Expand Up @@ -325,7 +325,7 @@ private EMRServerlessClient createEMRServerlessClient() {
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImplEMR(awsemrServerless);
return new EmrServerlessClientImpl(awsemrServerless);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* Client Interface for spark Job Submissions. Can have multiple implementations based on the
* underlying spark infrastructure. Currently, we have one for EMRServerless {@link
* EmrServerlessClientImplEMR}
* EmrServerlessClientImpl}
*/
public interface EMRServerlessClient {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EmrServerlessClientImplEMR implements EMRServerlessClient {
public class EmrServerlessClientImpl implements EMRServerlessClient {

private final AWSEMRServerless emrServerless;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImplEMR.class);
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

public EmrServerlessClientImplEMR(AWSEMRServerless emrServerless) {
public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

@Override
public String getJobIdFromFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = getIndexName(indexDetails).toLowerCase();
String indexName = getIndexName(indexDetails);

Check warning on line 19 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L19

Added line #L19 was not covered by tests
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand All @@ -34,27 +34,31 @@
private String getIndexName(IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =

Check warning on line 37 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L37

Added line #L37 was not covered by tests
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()

Check warning on line 40 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L40

Added line #L40 was not covered by tests
+ "_"
+ fullyQualifiedTableName.getSchemaName()

Check warning on line 42 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L42

Added line #L42 was not covered by tests
+ "_"
+ fullyQualifiedTableName.getTableName()

Check warning on line 44 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L44

Added line #L44 was not covered by tests
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();

Check warning on line 47 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L46-L47

Added lines #L46 - L47 were not covered by tests
} else if (FlintIndexType.COVERING.equals(indexDetails.getIndexType())) {
return "flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ indexDetails.getIndexName()
+ "_"
+ indexDetails.getIndexType().getName();
String indexName =

Check warning on line 49 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L49

Added line #L49 was not covered by tests
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()

Check warning on line 52 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L52

Added line #L52 was not covered by tests
+ "_"
+ fullyQualifiedTableName.getSchemaName()

Check warning on line 54 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L54

Added line #L54 was not covered by tests
+ "_"
+ fullyQualifiedTableName.getTableName()

Check warning on line 56 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L56

Added line #L56 was not covered by tests
+ "_"
+ indexDetails.getIndexName()

Check warning on line 58 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L58

Added line #L58 was not covered by tests
+ "_"
+ indexDetails.getIndexType().getSuffix();
return indexName.toLowerCase();

Check warning on line 61 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java#L60-L61

Added lines #L60 - L61 were not covered by tests
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", indexDetails.getIndexType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,19 @@

package org.opensearch.sql.spark.flint;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Enum for FlintIndex Type. */
public enum FlintIndexType {
SKIPPING("skipping_index"),
COVERING("covering_index"),
COVERING("index"),
MATERIALIZED("materialized_view");

private final String name;
private static final Map<String, FlintIndexType> ENUM_MAP;

FlintIndexType(String name) {
this.name = name;
}

public String getName() {
return this.name;
}
private final String suffix;

static {
Map<String, FlintIndexType> map = new HashMap<>();
for (FlintIndexType instance : FlintIndexType.values()) {
map.put(instance.getName().toLowerCase(), instance);
}
ENUM_MAP = Collections.unmodifiableMap(map);
FlintIndexType(String suffix) {
this.suffix = suffix;
}

public static FlintIndexType get(String name) {
return ENUM_MAP.get(name.toLowerCase());
public String getSuffix() {
return this.suffix;

Check warning on line 21 in spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexType.java#L21

Added line #L21 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -54,7 +54,7 @@ void testStartJobRunResultIndex() {
StartJobRunResult response = new StartJobRunResult();
when(emrServerless.startJobRun(any())).thenReturn(response);

EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.startJobRun(
new StartJobRequest(
QUERY,
Expand All @@ -74,15 +74,15 @@ void testGetJobRunState() {
GetJobRunResult response = new GetJobRunResult();
response.setJobRun(jobRun);
when(emrServerless.getJobRun(any())).thenReturn(response);
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
emrServerlessClient.getJobRunResult(EMRS_APPLICATION_ID, "123");
}

@Test
void testCancelJobRun() {
when(emrServerless.cancelJobRun(any()))
.thenReturn(new CancelJobRunResult().withJobRunId(EMR_JOB_ID));
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
CancelJobRunResult cancelJobRunResult =
emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID);
Assertions.assertEquals(EMR_JOB_ID, cancelJobRunResult.getJobRunId());
Expand All @@ -91,7 +91,7 @@ void testCancelJobRun() {
@Test
void testCancelJobRunWithValidationException() {
doThrow(new ValidationException("Error")).when(emrServerless).cancelJobRun(any());
EmrServerlessClientImplEMR emrServerlessClient = new EmrServerlessClientImplEMR(emrServerless);
EmrServerlessClientImpl emrServerlessClient = new EmrServerlessClientImpl(emrServerless);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
Expand Down
Loading