Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,6 @@ private void executeIndexOp(
flintIndexOpAlter.apply(indexMetadata);
break;
case VACUUM:
// Try to perform drop operation first
FlintIndexOp tryDropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
try {
tryDropOp.apply(indexMetadata);
} catch (IllegalStateException e) {
// Drop failed possibly due to invalid initial state
}

// Continue to delete index data physically if state is DELETED
// which means previous transaction succeeds
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.ValidationException;
import com.google.common.collect.Lists;
import java.util.Base64;
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob;
Expand Down Expand Up @@ -63,22 +63,15 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec {
.isSpecialCharacter(true));

@Test
public void shouldVacuumIndexInRefreshingState() {
public void shouldVacuumIndexInDeletedState() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(REFRESHING),
List.of(DELETED),
List.of(
// Happy case that there is job running
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))),
// Cancel EMR-S job, but not job running
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
throw new ValidationException("Job run is not in a cancellable state");
},
DEFAULT_OP)));
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")))));

runVacuumTestSuite(
testCases,
Expand All @@ -90,32 +83,11 @@ public void shouldVacuumIndexInRefreshingState() {
}

@Test
public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(REFRESHING),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Running")))));

runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("FAILED", response.getStatus());
assertEquals("Cancel job operation timed out.", response.getError());
assertTrue(indexExists(mockDS.indexName));
assertTrue(indexDocExists(mockDS.latestId));
});
}

@Test
public void shouldNotVacuumIndexInVacuumingState() {
public void shouldNotVacuumIndexInOtherStates() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(VACUUMING),
List.of(EMPTY, CREATING, ACTIVE, REFRESHING, VACUUMING),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
Expand All @@ -134,39 +106,29 @@ public void shouldNotVacuumIndexInVacuumingState() {
});
}

@Test
public void shouldVacuumIndexWithoutJobRunning() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(EMPTY, CREATING, ACTIVE, DELETED),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")))));

runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("SUCCESS", response.getStatus());
assertFalse(flintIndexExists(mockDS.indexName));
assertFalse(indexDocExists(mockDS.latestId));
});
}

private void runVacuumTestSuite(
List<List<Object>> testCases,
BiConsumer<FlintDatasetMock, AsyncQueryExecutionResponse> assertion) {
testCases.forEach(
params -> {
FlintDatasetMock mockDS = (FlintDatasetMock) params.get(0);
FlintIndexState state = (FlintIndexState) params.get(1);
EMRApiCall cancelJobRun = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getLeft();
EMRApiCall getJobRunResult = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getRight();

AsyncQueryExecutionResponse response =
runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult);
assertion.accept(mockDS, response);
try {
FlintIndexState state = (FlintIndexState) params.get(1);
EMRApiCall cancelJobRun = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getLeft();
EMRApiCall getJobRunResult = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getRight();

AsyncQueryExecutionResponse response =
runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult);
assertion.accept(mockDS, response);
} finally {
// Clean up because we simulate parameterized test in single unit test method
if (flintIndexExists(mockDS.indexName)) {
mockDS.deleteIndex();
}
if (indexDocExists(mockDS.latestId)) {
deleteIndexDoc(mockDS.latestId);
}
}
});
}

Expand Down Expand Up @@ -229,6 +191,10 @@ private boolean indexDocExists(String docId) {
.isExists();
}

private void deleteIndexDoc(String docId) {
client.delete(new DeleteRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)).actionGet();
}

private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) {
FlintDatasetMock dataset = new FlintDatasetMock(query, "", indexType, indexName);
dataset.latestId(Base64.getEncoder().encodeToString(indexName.getBytes()));
Expand Down