Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import static org.opensearch.sql.spark.execution.session.SessionState.END_STATE;
import static org.opensearch.sql.spark.execution.session.SessionState.FAIL;
import static org.opensearch.sql.spark.execution.statement.StatementId.newStatementId;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;

import java.util.Optional;
import lombok.Builder;
Expand All @@ -24,7 +22,8 @@
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

Expand All @@ -41,7 +40,8 @@ public class InteractiveSession implements Session {
public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final SessionStorageService sessionStorageService;
private final StatementStorageService statementStorageService;
private final EMRServerlessClient serverlessClient;
private SessionModel sessionModel;
// the threshold of elapsed time in milliseconds before we say a session is stale
Expand All @@ -64,7 +64,7 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
createSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel);
sessionStorageService.createSession(sessionModel, sessionModel.getDatasourceName());
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
Expand All @@ -76,7 +76,7 @@ public void open(CreateSessionRequest createSessionRequest) {
@Override
public void close() {
Optional<SessionModel> model =
getSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel.getId());
sessionStorageService.getSession(sessionModel.getId(), sessionModel.getDatasourceName());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
Expand All @@ -88,7 +88,7 @@ public void close() {
/** Submit statement. If submit successfully, Statement in waiting state. */
public StatementId submit(QueryRequest request) {
Optional<SessionModel> model =
getSession(stateStore, sessionModel.getDatasourceName()).apply(sessionModel.getId());
sessionStorageService.getSession(sessionModel.getId(), sessionModel.getDatasourceName());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
Expand All @@ -101,7 +101,7 @@ public StatementId submit(QueryRequest request) {
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementId(statementId)
.langType(LangType.SQL)
.datasourceName(sessionModel.getDatasourceName())
Expand All @@ -124,8 +124,8 @@ public StatementId submit(QueryRequest request) {

@Override
public Optional<Statement> get(StatementId stID) {
return StateStore.getStatement(stateStore, sessionModel.getDatasourceName())
.apply(stID.getId())
return statementStorageService
.getStatement(stID.getId(), sessionModel.getDatasourceName())
.map(
model ->
Statement.builder()
Expand All @@ -136,7 +136,7 @@ public Optional<Statement> get(StatementId stID) {
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementModel(model)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,31 @@
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.utils.RealTimeProvider;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final StateStore stateStore;
private final SessionStorageService sessionStorageService;
private final StatementStorageService statementStorageService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private Settings settings;

public SessionManager(
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory,
Settings settings) {
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.settings = settings;
}
private final Settings settings;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId(request.getDatasourceName()))
.stateStore(stateStore)
.sessionStorageService(sessionStorageService)
.statementStorageService(statementStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
.build();
session.open(request);
Expand All @@ -64,12 +60,13 @@ public Session createSession(CreateSessionRequest request) {
*/
public Optional<Session> getSession(SessionId sid, String dataSourceName) {
Optional<SessionModel> model =
StateStore.getSession(stateStore, dataSourceName).apply(sid.getSessionId());
sessionStorageService.getSession(sid.getSessionId(), dataSourceName);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.stateStore(stateStore)
.sessionStorageService(sessionStorageService)
.statementStorageService(statementStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
.sessionModel(model.get())
.sessionInactivityTimeoutMilli(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.execution.statement.StatementModel.submitStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState;

import lombok.Builder;
import lombok.Getter;
Expand All @@ -18,7 +15,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;

/** Statement represent query to execute in session. One statement map to one session. */
Expand All @@ -35,7 +32,7 @@ public class Statement {
private final String datasourceName;
private final String query;
private final String queryId;
private final StateStore stateStore;
private final StatementStorageService statementStorageService;

@Setter private StatementModel statementModel;

Expand All @@ -52,7 +49,7 @@ public void open() {
datasourceName,
query,
queryId);
statementModel = createStatement(stateStore, datasourceName).apply(statementModel);
statementModel = statementStorageService.createStatement(statementModel, datasourceName);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
Expand All @@ -76,17 +73,17 @@ public void cancel() {
}
try {
this.statementModel =
updateStatementState(stateStore, statementModel.getDatasourceName())
.apply(this.statementModel, StatementState.CANCELLED);
statementStorageService.updateStatementState(
statementModel, StatementState.CANCELLED, statementModel.getDatasourceName());
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
getStatement(stateStore, statementModel.getDatasourceName())
.apply(statementModel.getId())
statementStorageService
.getStatement(statementModel.getId(), statementModel.getDatasourceName())
.orElse(this.statementModel);
String errorMsg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;

@RequiredArgsConstructor
public class OpenSearchSessionStorageService implements SessionStorageService {

private final StateStore stateStore;

@Override
public SessionModel createSession(SessionModel sessionModel, String datasourceName) {
return stateStore.create(
sessionModel, SessionModel::of, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public Optional<SessionModel> getSession(String id, String datasourceName) {
return stateStore.get(
id, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName) {
return stateStore.updateState(
sessionModel,
sessionState,
SessionModel::copyWithState,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;

@RequiredArgsConstructor
public class OpenSearchStatementStorageService implements StatementStorageService {

private final StateStore stateStore;

@Override
public StatementModel createStatement(StatementModel statementModel, String datasourceName) {
return stateStore.create(
statementModel, StatementModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public Optional<StatementModel> getStatement(String id, String datasourceName) {
return stateStore.get(
id, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import java.util.Optional;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;

/** Interface for accessing {@link SessionModel} data storage. */
public interface SessionStorageService {

SessionModel createSession(SessionModel sessionModel, String datasourceName);

Optional<SessionModel> getSession(String id, String datasourceName);

SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName);
}
Loading