Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ private boolean checkContainer() {
Optional<DockerContainer> maybeContainer = DockerContainer.getContainerByUUID( ConfigManager.getInstance().getConfig( CONFIG_CONTAINER_KEY ).getString() );
if ( maybeContainer.isPresent() ) {
this.container = maybeContainer.get();
// just because the container exists does not mean it is running
if ( !testConnection() ) {
this.container = null;
return false;
}
onContainerRunning();
return true;
}
Expand Down Expand Up @@ -127,6 +132,19 @@ private void getDockerInstances( Context ctx ) {
}


private void getDockerInstance( Context ctx ) {
if ( container != null ) {
Optional<DockerInstance> dockerInstance = DockerManager.getInstance().getInstanceForContainer( container.getContainerId() );
if ( dockerInstance.isPresent() ) {
DockerInstanceInfo info = dockerInstance.get().getInfo();
ctx.status( 200 ).json( info );
return;
}
}
ctx.status( 200 );
}


/**
* Deploys the docker container with polypheny-jupyter-server image.
* For storing the notebooks in the Polypheny Home directory, a bind mount is used.
Expand Down Expand Up @@ -154,6 +172,15 @@ private boolean createContainer( DockerInstance dockerInstance ) {
log.info( "Jupyter Server container has been deployed" );
} else {
this.container = maybeContainer.get();
// ensure the existing container is actually running
if ( !testConnection() ) {
this.container.start();
if ( !this.container.waitTillStarted( this::testConnection, 20000 ) ) {
this.container.destroy();
this.container = null;
throw new GenericRuntimeException( "Failed to start Jupyter Server container" );
}
}
}
onContainerRunning();
return true;
Expand Down Expand Up @@ -263,6 +290,7 @@ private void registerEndpoints() {
}, HandlerType.GET );
server.addSerializedRoute( PATH + "/export/<path>", fs::export, HandlerType.GET );
server.addSerializedRoute( PATH + "/connections", proxyOrEmpty( proxy -> proxy::openConnections ), HandlerType.GET );
server.addSerializedRoute( PATH + "/container/getDockerInstance", this::getDockerInstance, HandlerType.GET );
server.addSerializedRoute( PATH + "/container/getDockerInstances", this::getDockerInstances, HandlerType.GET );

server.addSerializedRoute( PATH + "/contents/<parentPath>", fs::createFile, HandlerType.POST );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ public JupyterKernel( String kernelId, String name, Builder builder, String host
this.name = name;
this.clientId = UUID.randomUUID().toString();
this.kernelLanguage = JupyterLanguageFactory.getKernelLanguage( name );

this.supportsPolyCells = this.kernelLanguage != null;

String url = "ws://" + host + "/api/kernels/" + this.kernelId + "/channels?session_id=" + clientId;

this.webSocket = builder.buildAsync( URI.create( url ), new WebSocketClient() ).join();

this.statusMsg = new JsonObject();
this.statusMsg.addProperty( "msg_type", "status" );
JsonObject content = new JsonObject();
content.addProperty( "execution_state", "starting" );
this.statusMsg.add( "content", content );

String url = "ws://" + host + "/api/kernels/" + this.kernelId + "/channels?session_id=" + clientId;

this.webSocket = builder.buildAsync( URI.create( url ), new WebSocketClient() ).join();

sendInitCode();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
import org.polypheny.db.webui.HttpServer;
import org.polypheny.db.workflow.dag.activities.ActivityRegistry;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowDefModel.IdentifiedWorflowDefModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.repo.WorkflowRepo;
import org.polypheny.db.workflow.repo.WorkflowRepo.WorkflowRepoException;
import org.polypheny.db.workflow.session.SessionManager;

public class WorkflowApi {

private final SessionManager sessionManager;
private final WorkflowRepo repo;
public static final String PATH = WorkflowManager.PATH + "/api";


public WorkflowApi( SessionManager sessionManager ) {
public WorkflowApi( SessionManager sessionManager, WorkflowRepo repo ) {
this.sessionManager = sessionManager;
this.repo = repo;
}


Expand All @@ -53,12 +58,15 @@ public void registerEndpoints( HttpServer server ) {
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/{activityId}/{outIndex}", this::getIntermediaryResult, HandlerType.GET ); // queryParam: limit = null
server.addSerializedRoute( PATH + "/registry", this::getActivityRegistry, HandlerType.GET ); // queryParam: array = false
server.addSerializedRoute( PATH + "/registry/{activityType}", this::getActivityDef, HandlerType.GET );
server.addSerializedRoute( PATH + "/workflows", this::getWorkflowDefs, HandlerType.GET );
server.addSerializedRoute( PATH + "/workflows/{workflowId}/{version}", this::getStoredWorkflow, HandlerType.GET );

server.addSerializedRoute( PATH + "/sessions", this::createSession, HandlerType.POST ); // queryParam: execute = false <- if true: immediately execute workflow
server.addSerializedRoute( PATH + "/sessions/{sessionId}/execute", this::execute, HandlerType.POST ); // queryParam: target = null
server.addSerializedRoute( PATH + "/sessions/{sessionId}/reset", this::reset, HandlerType.POST ); // queryParam: target = null
server.addSerializedRoute( PATH + "/sessions/{sessionId}/interrupt", this::interrupt, HandlerType.POST );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/config", this::setWorkflowConfig, HandlerType.POST );
server.addSerializedRoute( PATH + "/workflows/{workflowId}/{version}/open", this::createSessionFromStoredWorkflow, HandlerType.POST );

server.addSerializedRoute( PATH + "/sessions", this::terminateSessions, HandlerType.DELETE );
server.addSerializedRoute( PATH + "/sessions/{sessionId}", this::terminateSession, HandlerType.DELETE );
Expand All @@ -67,7 +75,7 @@ public void registerEndpoints( HttpServer server ) {


private void getSessions( final Context ctx ) {
process( ctx, sessionManager::getApiSessionModels );
process( ctx, () -> sessionManager.getApiSessionModels().values() );
}


Expand Down Expand Up @@ -156,6 +164,32 @@ private void getActivityDef( final Context ctx ) {
}


private void getWorkflowDefs( final Context ctx ) {
process( ctx, () -> {
try {
return repo.getWorkflowDefs().entrySet().stream()
.map( e -> new IdentifiedWorflowDefModel( e.getValue(), e.getKey() ) )
.toList();
} catch ( WorkflowRepoException e ) {
throw new WorkflowApiException( e.getMessage(), e.getErrorCode() );
}
} );
}


private void getStoredWorkflow( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
int version = Integer.parseInt( ctx.pathParam( "version" ) );
process( ctx, () -> {
try {
return repo.readVersion( workflowId, version );
} catch ( WorkflowRepoException e ) {
throw new WorkflowApiException( e.getMessage(), e.getErrorCode() );
}
} );
}


private void createSession( final Context ctx ) {
WorkflowModel workflowModel = ctx.bodyAsClass( WorkflowModel.class );
boolean execute = getQueryParam( ctx, "execute", false );
Expand Down Expand Up @@ -210,6 +244,26 @@ private void setWorkflowConfig( final Context ctx ) {
}


private void createSessionFromStoredWorkflow( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
int version = Integer.parseInt( ctx.pathParam( "version" ) );
boolean execute = getQueryParam( ctx, "execute", false );
process( ctx, () -> {
try {
WorkflowModel workflowModel = repo.readVersion( workflowId, version );
UUID sessionId = sessionManager.createApiSession( workflowModel );
if ( execute ) {
sessionManager.getApiSessionOrThrow( sessionId ).execute( null );
}
return sessionId;
} catch ( WorkflowRepoException e ) {
throw new WorkflowApiException( e.getMessage(), e.getErrorCode() );
}
} );

}


private void terminateSessions( final Context ctx ) {
process( ctx, sessionManager::terminateApiSessions );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public WorkflowManager( TransactionManager tm ) {
repo = WorkflowRepoImpl.getInstance();
sessionManager = new SessionManager( tm );
registerEndpoints( sessionManager );
apiManager = new WorkflowApi( sessionManager );
apiManager = new WorkflowApi( sessionManager, repo );
apiManager.registerEndpoints( HttpServer.getInstance() );
jobManager = new JobManager( sessionManager );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -101,4 +102,21 @@ public String toString() {

}


/**
* This class can be used for serialization where the workflowId needs to be included
*/
@Getter
public static class IdentifiedWorflowDefModel extends WorkflowDefModel {

private final UUID workflowId;


public IdentifiedWorflowDefModel( WorkflowDefModel defModel, UUID workflowId ) {
super( defModel.getName(), defModel.getGroup(), defModel.getDescription(), defModel.getVersions() );
this.workflowId = workflowId;
}

}

}