Skip to content

Commit 45122ec

Browse files
authored
Refactor IndexDMLHandler and related classes (#2644)
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
1 parent b454a2c commit 45122ec

20 files changed

+486
-564
lines changed

plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,9 @@
7979
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
8080
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
8181
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
82-
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
8382
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
84-
import org.opensearch.sql.spark.execution.statestore.StateStore;
8583
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
84+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
8685
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
8786
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
8887
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
@@ -227,8 +226,7 @@ public Collection<Object> createComponents(
227226
environment.settings(),
228227
dataSourceService,
229228
injector.getInstance(FlintIndexMetadataServiceImpl.class),
230-
injector.getInstance(StateStore.class),
231-
injector.getInstance(EMRServerlessClientFactory.class));
229+
injector.getInstance(FlintIndexOpFactory.class));
232230
return ImmutableList.of(
233231
dataSourceService,
234232
injector.getInstance(AsyncQueryExecutorService.class),

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import org.opensearch.common.unit.TimeValue;
2222
import org.opensearch.sql.datasource.DataSourceService;
2323
import org.opensearch.sql.datasource.model.DataSourceMetadata;
24-
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
25-
import org.opensearch.sql.spark.execution.statestore.StateStore;
2624
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
25+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
2726
import org.opensearch.threadpool.Scheduler.Cancellable;
2827
import org.opensearch.threadpool.ThreadPool;
2928

@@ -37,8 +36,7 @@ public class ClusterManagerEventListener implements LocalNodeClusterManagerListe
3736
private Clock clock;
3837
private DataSourceService dataSourceService;
3938
private FlintIndexMetadataService flintIndexMetadataService;
40-
private StateStore stateStore;
41-
private EMRServerlessClientFactory emrServerlessClientFactory;
39+
private FlintIndexOpFactory flintIndexOpFactory;
4240
private Duration sessionTtlDuration;
4341
private Duration resultTtlDuration;
4442
private TimeValue streamingJobHouseKeepingInterval;
@@ -56,17 +54,15 @@ public ClusterManagerEventListener(
5654
Settings settings,
5755
DataSourceService dataSourceService,
5856
FlintIndexMetadataService flintIndexMetadataService,
59-
StateStore stateStore,
60-
EMRServerlessClientFactory emrServerlessClientFactory) {
57+
FlintIndexOpFactory flintIndexOpFactory) {
6158
this.clusterService = clusterService;
6259
this.threadPool = threadPool;
6360
this.client = client;
6461
this.clusterService.addLocalNodeClusterManagerListener(this);
6562
this.clock = clock;
6663
this.dataSourceService = dataSourceService;
6764
this.flintIndexMetadataService = flintIndexMetadataService;
68-
this.stateStore = stateStore;
69-
this.emrServerlessClientFactory = emrServerlessClientFactory;
65+
this.flintIndexOpFactory = flintIndexOpFactory;
7066
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
7167
this.resultTtlDuration = toDuration(resultTtl.get(settings));
7268
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);
@@ -151,10 +147,7 @@ private void initializeStreamingJobHouseKeeperCron() {
151147
flintStreamingJobHouseKeeperCron =
152148
threadPool.scheduleWithFixedDelay(
153149
new FlintStreamingJobHouseKeeperTask(
154-
dataSourceService,
155-
flintIndexMetadataService,
156-
stateStore,
157-
emrServerlessClientFactory),
150+
dataSourceService, flintIndexMetadataService, flintIndexOpFactory),
158151
streamingJobHouseKeepingInterval,
159152
executorName());
160153
}

spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,18 @@
1717
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
1818
import org.opensearch.sql.legacy.metrics.MetricName;
1919
import org.opensearch.sql.legacy.metrics.Metrics;
20-
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
2120
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
22-
import org.opensearch.sql.spark.execution.statestore.StateStore;
2321
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
2422
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
25-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
26-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
23+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
2724

2825
/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
2926
@RequiredArgsConstructor
3027
public class FlintStreamingJobHouseKeeperTask implements Runnable {
3128

3229
private final DataSourceService dataSourceService;
3330
private final FlintIndexMetadataService flintIndexMetadataService;
34-
private final StateStore stateStore;
35-
private final EMRServerlessClientFactory emrServerlessClientFactory;
31+
private final FlintIndexOpFactory flintIndexOpFactory;
3632

3733
private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
3834
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);
@@ -95,9 +91,7 @@ private void dropAutoRefreshIndex(
9591
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
9692
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
9793
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
98-
FlintIndexOpDrop flintIndexOpDrop =
99-
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
100-
flintIndexOpDrop.apply(flintIndexMetadata);
94+
flintIndexOpFactory.getDrop(datasourceName).apply(flintIndexMetadata);
10195
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
10296
}
10397

@@ -106,14 +100,7 @@ private void alterAutoRefreshIndex(
106100
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
107101
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
108102
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
109-
FlintIndexOpAlter flintIndexOpAlter =
110-
new FlintIndexOpAlter(
111-
flintIndexOptions,
112-
stateStore,
113-
datasourceName,
114-
emrServerlessClientFactory.getClient(),
115-
flintIndexMetadataService);
116-
flintIndexOpAlter.apply(flintIndexMetadata);
103+
flintIndexOpFactory.getAlter(flintIndexOptions, datasourceName).apply(flintIndexMetadata);
117104
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
118105
}
119106

spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java

Lines changed: 22 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
99
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
10-
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;
1110

1211
import com.amazonaws.services.emrserverless.model.JobRunState;
1312
import java.util.Map;
@@ -16,24 +15,20 @@
1615
import org.apache.logging.log4j.LogManager;
1716
import org.apache.logging.log4j.Logger;
1817
import org.json.JSONObject;
19-
import org.opensearch.client.Client;
2018
import org.opensearch.sql.datasource.model.DataSourceMetadata;
2119
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
2220
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
23-
import org.opensearch.sql.spark.client.EMRServerlessClient;
2421
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
2522
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
2623
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
2724
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
2825
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
2926
import org.opensearch.sql.spark.execution.statement.StatementState;
30-
import org.opensearch.sql.spark.execution.statestore.StateStore;
3127
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
3228
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
29+
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
3330
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
34-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
35-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
36-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
31+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
3732
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
3833

3934
/** Handle Index DML query. includes * DROP * ALT? */
@@ -45,15 +40,10 @@ public class IndexDMLHandler extends AsyncQueryHandler {
4540
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
4641
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";
4742

48-
private final EMRServerlessClient emrServerlessClient;
49-
5043
private final JobExecutionResponseReader jobExecutionResponseReader;
51-
5244
private final FlintIndexMetadataService flintIndexMetadataService;
53-
54-
private final StateStore stateStore;
55-
56-
private final Client client;
45+
private final IndexDMLResultStorageService indexDMLResultStorageService;
46+
private final FlintIndexOpFactory flintIndexOpFactory;
5747

5848
public static boolean isIndexDMLQuery(String jobId) {
5949
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
@@ -67,14 +57,16 @@ public DispatchQueryResponse submit(
6757
try {
6858
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
6959
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
70-
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);
60+
61+
getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);
62+
7163
AsyncQueryId asyncQueryId =
7264
storeIndexDMLResult(
7365
dispatchQueryRequest,
7466
dataSourceMetadata,
7567
JobRunState.SUCCESS.toString(),
7668
StringUtils.EMPTY,
77-
startTime);
69+
getElapsedTimeSince(startTime));
7870
return new DispatchQueryResponse(
7971
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
8072
} catch (Exception e) {
@@ -85,7 +77,7 @@ public DispatchQueryResponse submit(
8577
dataSourceMetadata,
8678
JobRunState.FAILED.toString(),
8779
e.getMessage(),
88-
startTime);
80+
getElapsedTimeSince(startTime));
8981
return new DispatchQueryResponse(
9082
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
9183
}
@@ -96,46 +88,34 @@ private AsyncQueryId storeIndexDMLResult(
9688
DataSourceMetadata dataSourceMetadata,
9789
String status,
9890
String error,
99-
long startTime) {
91+
long queryRunTime) {
10092
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
10193
IndexDMLResult indexDMLResult =
10294
new IndexDMLResult(
10395
asyncQueryId.getId(),
10496
status,
10597
error,
10698
dispatchQueryRequest.getDatasource(),
107-
System.currentTimeMillis() - startTime,
99+
queryRunTime,
108100
System.currentTimeMillis());
109-
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
101+
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName());
110102
return asyncQueryId;
111103
}
112104

113-
private void executeIndexOp(
114-
DispatchQueryRequest dispatchQueryRequest,
115-
IndexQueryDetails indexQueryDetails,
116-
FlintIndexMetadata indexMetadata) {
105+
private long getElapsedTimeSince(long startTime) {
106+
return System.currentTimeMillis() - startTime;
107+
}
108+
109+
private FlintIndexOp getIndexOp(
110+
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
117111
switch (indexQueryDetails.getIndexQueryActionType()) {
118112
case DROP:
119-
FlintIndexOp dropOp =
120-
new FlintIndexOpDrop(
121-
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
122-
dropOp.apply(indexMetadata);
123-
break;
113+
return flintIndexOpFactory.getDrop(dispatchQueryRequest.getDatasource());
124114
case ALTER:
125-
FlintIndexOpAlter flintIndexOpAlter =
126-
new FlintIndexOpAlter(
127-
indexQueryDetails.getFlintIndexOptions(),
128-
stateStore,
129-
dispatchQueryRequest.getDatasource(),
130-
emrServerlessClient,
131-
flintIndexMetadataService);
132-
flintIndexOpAlter.apply(indexMetadata);
133-
break;
115+
return flintIndexOpFactory.getAlter(
116+
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
134117
case VACUUM:
135-
FlintIndexOp indexVacuumOp =
136-
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
137-
indexVacuumOp.apply(indexMetadata);
138-
break;
118+
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
139119
default:
140120
throw new IllegalStateException(
141121
String.format(

spark/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
package org.opensearch.sql.spark.dispatcher;
77

88
import lombok.RequiredArgsConstructor;
9-
import org.opensearch.client.Client;
109
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
1110
import org.opensearch.sql.spark.execution.session.SessionManager;
12-
import org.opensearch.sql.spark.execution.statestore.StateStore;
1311
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
12+
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
13+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
1414
import org.opensearch.sql.spark.leasemanager.LeaseManager;
1515
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
1616

@@ -19,19 +19,19 @@ public class QueryHandlerFactory {
1919

2020
private final JobExecutionResponseReader jobExecutionResponseReader;
2121
private final FlintIndexMetadataService flintIndexMetadataService;
22-
private final Client client;
2322
private final SessionManager sessionManager;
2423
private final LeaseManager leaseManager;
25-
private final StateStore stateStore;
24+
private final IndexDMLResultStorageService indexDMLResultStorageService;
25+
private final FlintIndexOpFactory flintIndexOpFactory;
2626
private final EMRServerlessClientFactory emrServerlessClientFactory;
2727

2828
public RefreshQueryHandler getRefreshQueryHandler() {
2929
return new RefreshQueryHandler(
3030
emrServerlessClientFactory.getClient(),
3131
jobExecutionResponseReader,
3232
flintIndexMetadataService,
33-
stateStore,
34-
leaseManager);
33+
leaseManager,
34+
flintIndexOpFactory);
3535
}
3636

3737
public StreamingQueryHandler getStreamingQueryHandler() {
@@ -50,10 +50,9 @@ public InteractiveQueryHandler getInteractiveQueryHandler() {
5050

5151
public IndexDMLHandler getIndexDMLHandler() {
5252
return new IndexDMLHandler(
53-
emrServerlessClientFactory.getClient(),
5453
jobExecutionResponseReader,
5554
flintIndexMetadataService,
56-
stateStore,
57-
client);
55+
indexDMLResultStorageService,
56+
flintIndexOpFactory);
5857
}
5958
}

spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,28 @@
1313
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
1414
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
1515
import org.opensearch.sql.spark.dispatcher.model.JobType;
16-
import org.opensearch.sql.spark.execution.statestore.StateStore;
1716
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
1817
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
1918
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
20-
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
19+
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
2120
import org.opensearch.sql.spark.leasemanager.LeaseManager;
2221
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
2322

2423
/** Handle Refresh Query. */
2524
public class RefreshQueryHandler extends BatchQueryHandler {
2625

2726
private final FlintIndexMetadataService flintIndexMetadataService;
28-
private final StateStore stateStore;
29-
private final EMRServerlessClient emrServerlessClient;
27+
private final FlintIndexOpFactory flintIndexOpFactory;
3028

3129
public RefreshQueryHandler(
3230
EMRServerlessClient emrServerlessClient,
3331
JobExecutionResponseReader jobExecutionResponseReader,
3432
FlintIndexMetadataService flintIndexMetadataService,
35-
StateStore stateStore,
36-
LeaseManager leaseManager) {
33+
LeaseManager leaseManager,
34+
FlintIndexOpFactory flintIndexOpFactory) {
3735
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
3836
this.flintIndexMetadataService = flintIndexMetadataService;
39-
this.stateStore = stateStore;
40-
this.emrServerlessClient = emrServerlessClient;
37+
this.flintIndexOpFactory = flintIndexOpFactory;
4138
}
4239

4340
@Override
@@ -51,8 +48,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
5148
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
5249
}
5350
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
54-
FlintIndexOp jobCancelOp =
55-
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
51+
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
5652
jobCancelOp.apply(indexMetadata);
5753
return asyncQueryJobMetadata.getQueryId().getId();
5854
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.flint;
7+
8+
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
9+
10+
public interface IndexDMLResultStorageService {
11+
IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName);
12+
}

0 commit comments

Comments
 (0)