Skip to content

Commit

Permalink
add InteractiveSession and SessionManager (#2290)
Browse files Browse the repository at this point in the history
* add InteractiveSession and SessionManager

Signed-off-by: Peng Huo <penghuo@gmail.com>

* address comments

Signed-off-by: Peng Huo <penghuo@gmail.com>

---------

Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo authored Oct 13, 2023
1 parent b86cf2f commit f856cb3
Show file tree
Hide file tree
Showing 15 changed files with 834 additions and 5 deletions.
39 changes: 34 additions & 5 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,47 @@ dependencies {
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation(platform("org.junit:junit-bom:5.6.2"))

testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
testImplementation "org.opensearch.test:framework:${opensearch_version}"

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.platform:junit-platform-launcher") {
because 'allows tests to run from IDEs that bundle older version of launcher'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
useJUnitPlatform {
includeEngines("junit-jupiter")
}
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.enabled true
xml.enabled true
Expand All @@ -78,9 +103,10 @@ jacocoTestReport {
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
Expand All @@ -92,6 +118,9 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.opensearch.sql.spark.client.StartJobRequest;

@Data
public class CreateSessionRequest {
private final StartJobRequest startJobRequest;
private final String datasourceName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;

import java.util.Optional;
import lombok.Builder;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Interactive session.
*
* <p>ENTRY_STATE: not_started
*/
@Getter
@Builder
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
public void open(CreateSessionRequest createSessionRequest) {
try {
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStateStore.create(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
public void close() {
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
if (model.isEmpty()) {
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
void open(CreateSessionRequest createSessionRequest);

/** close session. */
void close();

SessionModel getSessionModel();

SessionId getSessionId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.apache.commons.lang3.RandomStringUtils;

@Data
public class SessionId {
private final String sessionId;

public static SessionId newSessionId() {
return new SessionId(RandomStringUtils.random(10, true, true));
}

@Override
public String toString() {
return "sessionId=" + sessionId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final SessionStateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = stateStore.get(sid);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
return Optional.ofNullable(session);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel implements ToXContentObject {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
public static final String SESSION_ID = "sessionId";
public static final String SESSION_STATE = "state";
public static final String DATASOURCE_NAME = "dataSourceName";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String APPLICATION_ID = "applicationId";
public static final String JOB_ID = "jobId";
public static final String ERROR = "error";
public static final String UNKNOWN = "unknown";
public static final String SESSION_DOC_TYPE = "session";

private final String version;
private final SessionType sessionType;
private final SessionId sessionId;
private final SessionState sessionState;
private final String applicationId;
private final String jobId;
private final String datasourceName;
private final String error;
private final long lastUpdateTime;

private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

@SneakyThrows
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
.version("1.0")
.sessionType(INTERACTIVE)
.sessionId(sid)
.sessionState(NOT_STARTED)
.datasourceName(datasourceName)
.applicationId(applicationId)
.jobId(jobId)
.error(UNKNOWN)
.lastUpdateTime(System.currentTimeMillis())
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}
}
Loading

0 comments on commit f856cb3

Please sign in to comment.