Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
eb1ec0b
add cuid column
bobbai00 Apr 26, 2025
fa7a680
add computing unit conf
bobbai00 Apr 26, 2025
30128f9
add columns to computing unit
bobbai00 Apr 28, 2025
e1bcde1
initial version
bobbai00 Apr 28, 2025
0805248
finish 1st version
bobbai00 Apr 28, 2025
c753960
finish the frontend refinement
bobbai00 Apr 28, 2025
c5ccbde
add computing unit id to the execution request
bobbai00 Apr 29, 2025
d638798
incorporate with cuid changes
bobbai00 May 7, 2025
218e7bc
add cuid to request
bobbai00 May 7, 2025
e7322e0
fix helm
bobbai00 May 8, 2025
3634a64
recover the changes of the image pull policy
bobbai00 May 8, 2025
d7b7276
add texera latest ddl
bobbai00 May 8, 2025
9929446
fix table order
bobbai00 May 8, 2025
5fcccdf
init refactor frontend first version
bobbai00 May 8, 2025
def0af7
finish the first version
bobbai00 May 9, 2025
8583dcf
finish the first version+
bobbai00 May 9, 2025
bdbff1c
finish the status management
bobbai00 May 10, 2025
2105910
refactor websocket with subject
bobbai00 May 11, 2025
1bca2e2
add ways to prevent flashing items
bobbai00 May 11, 2025
f9a7bd5
workable version
bobbai00 May 11, 2025
600cee9
clean more state
bobbai00 May 11, 2025
98b4470
do more cleanup
bobbai00 May 11, 2025
e053ab4
fix the termination bug
bobbai00 May 11, 2025
2cae62a
keep simplifying frontend code
bobbai00 May 11, 2025
0b05c97
refactor state
bobbai00 May 11, 2025
18dc1b8
add cuid polling
bobbai00 May 11, 2025
c7f7d03
finish fix of cu polling
bobbai00 May 12, 2025
5294ca8
add cuid to latest execution identifier and result export request
bobbai00 May 12, 2025
052ce94
add cuid to the execution history display
bobbai00 May 12, 2025
076c106
recover the image names
bobbai00 May 12, 2025
b5f095d
recover the image names
bobbai00 May 12, 2025
823038e
add the header back
bobbai00 May 12, 2025
ad0f67e
add the header
bobbai00 May 12, 2025
cab06ee
prevent workflow from editing
bobbai00 May 12, 2025
5788ff8
recover some unexpected changes
bobbai00 May 13, 2025
191a1a5
try to fix the test
bobbai00 May 13, 2025
faa23af
try to fix the test
bobbai00 May 15, 2025
0c53b38
add license
bobbai00 May 15, 2025
ee78472
recover the test
bobbai00 May 15, 2025
56eda5c
revert some unexpected changes
bobbai00 May 19, 2025
7821ce8
resolve the comments
bobbai00 May 19, 2025
60a23b6
keep improving the frontend
bobbai00 May 19, 2025
15a99d8
merge with latest master
bobbai00 May 26, 2025
336d1ce
make the auto selection work
bobbai00 May 26, 2025
b484c88
add filter for list computing units for kubernetes
bobbai00 May 26, 2025
ba533ce
add filter for list computing units for kubernetes
bobbai00 May 26, 2025
9dfabed
avoid hard code the url of cu for local
bobbai00 May 26, 2025
0a9ff4d
align the backend more with the frontend
bobbai00 May 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import com.fasterxml.jackson.module.noctordeser.NoCtorDeserModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import org.jooq.DSLContext
import org.jooq.impl.DSL

import java.nio.file.{Files, Path, Paths}
import java.text.SimpleDateFormat
Expand Down Expand Up @@ -121,6 +119,23 @@ object Utils extends LazyLogging {
}
}

def stringToAggregatedState(str: String): WorkflowAggregatedState = {
str.trim.toLowerCase match {
case "uninitialized" => WorkflowAggregatedState.UNINITIALIZED
case "ready" => WorkflowAggregatedState.READY
case "initializing" => WorkflowAggregatedState.READY // accept alias
case "running" => WorkflowAggregatedState.RUNNING
case "pausing" => WorkflowAggregatedState.PAUSING
case "paused" => WorkflowAggregatedState.PAUSED
case "resuming" => WorkflowAggregatedState.RESUMING
case "completed" => WorkflowAggregatedState.COMPLETED
case "failed" => WorkflowAggregatedState.FAILED
case "killed" => WorkflowAggregatedState.KILLED
case "unknown" => WorkflowAggregatedState.UNKNOWN
case other => throw new IllegalArgumentException(s"Unrecognized state: $other")
}
}

/**
* @param state indicates the workflow state
* @return code indicates the status of the execution in the DB it is 0 by default for any unused states.
Expand Down Expand Up @@ -151,15 +166,4 @@ object Utils extends LazyLogging {
lock.unlock()
}
}

def withTransaction[T](dsl: DSLContext)(block: DSLContext => T): T = {
var result: Option[T] = None

dsl.transaction(configuration => {
val ctx = DSL.using(configuration)
result = Some(block(ctx))
})

result.getOrElse(throw new RuntimeException("Transaction failed without result!"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ case class ResultExportRequest(
rowIndex: Int, // used by "data" export
columnIndex: Int, // used by "data" export
filename: String, // optional filename override
destination: String // "dataset" or "local"
destination: String, // "dataset" or "local"
// TODO: remove it once the lifecycle of result and compute are unbundled
computingUnitId: Int // the id of the computing unit
)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ case class WorkflowExecuteRequest(
logicalPlan: LogicalPlanPojo,
replayFromExecution: Option[ReplayExecutionInfo], // contains execution Id, interaction Id.
workflowSettings: WorkflowSettings,
emailNotificationEnabled: Boolean
emailNotificationEnabled: Boolean,
computingUnitId: Int
) extends TexeraWebSocketRequest

case class LogicalPlanPojo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class WorkflowWebsocketResource extends LazyLogging {
val sessionState = new SessionState(session)
SessionState.setState(session.getId, sessionState)
val wid = session.getRequestParameterMap.get("wid").get(0).toLong
val cuid = session.getRequestParameterMap.get("cuid").get(0).toInt
// hack to refresh frontend run button state
sessionState.send(WorkflowStateEvent("Uninitialized"))
val workflowState =
WorkflowService.getOrCreate(WorkflowIdentity(wid))
WorkflowService.getOrCreate(WorkflowIdentity(wid), cuid)
sessionState.subscribe(workflowState)
sessionState.send(ClusterStatusUpdateEvent(ClusterListener.numWorkerNodesInCluster))
logger.info("connection open")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import edu.uci.ics.amber.core.virtualidentity._
import edu.uci.ics.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
import edu.uci.ics.amber.engine.common.AmberConfig
import edu.uci.ics.amber.engine.common.Utils.{maptoStatusCode, stringToAggregatedState}
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.jooq.generated.Tables._
import edu.uci.ics.texera.dao.jooq.generated.tables.daos.WorkflowExecutionsDao
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowExecutions
import edu.uci.ics.texera.auth.SessionUser
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.web.model.http.request.result.ResultExportRequest
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource._
import edu.uci.ics.texera.web.service.{ExecutionsMetadataPersistService, ResultExportService}
Expand Down Expand Up @@ -83,13 +85,13 @@ object WorkflowExecutionsResource {
* @param wid workflow id
* @return Integer
*/
def getLatestExecutionID(wid: Integer): Option[Integer] = {
def getLatestExecutionID(wid: Integer, cuid: Integer): Option[Integer] = {
val executions = context
.select(WORKFLOW_EXECUTIONS.EID)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID))
.where(WORKFLOW_VERSION.WID.eq(wid))
.where(WORKFLOW_VERSION.WID.eq(wid).and(WORKFLOW_EXECUTIONS.CUID.eq(cuid)))
.fetchInto(classOf[Integer])
.asScala
.toList
Expand Down Expand Up @@ -196,11 +198,24 @@ object WorkflowExecutionsResource {
.map(URI.create)
else None

def getWorkflowExecutions(wid: Integer, context: DSLContext): List[WorkflowExecutionEntry] = {
def getWorkflowExecutions(
wid: Integer,
context: DSLContext,
statusCodes: Set[Byte] = Set.empty
): List[WorkflowExecutionEntry] = {
var condition = WORKFLOW_VERSION.WID.eq(wid)

if (statusCodes.nonEmpty) {
condition = condition.and(
WORKFLOW_EXECUTIONS.STATUS.in(statusCodes.map(Byte.box).asJava)
)
}

context
.select(
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
WORKFLOW_EXECUTIONS.CUID,
USER.NAME,
USER.GOOGLE_AVATAR,
WORKFLOW_EXECUTIONS.STATUS,
Expand All @@ -216,7 +231,7 @@ object WorkflowExecutionsResource {
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.where(WORKFLOW_VERSION.WID.eq(wid))
.where(condition)
.orderBy(WORKFLOW_EXECUTIONS.EID.desc())
.fetchInto(classOf[WorkflowExecutionEntry])
.asScala
Expand Down Expand Up @@ -426,6 +441,7 @@ object WorkflowExecutionsResource {
case class WorkflowExecutionEntry(
eId: Integer,
vId: Integer,
cuId: Integer,
userName: String,
googleAvatar: String,
status: Byte,
Expand Down Expand Up @@ -466,6 +482,56 @@ case class ExecutionRenameRequest(wid: Integer, eId: Integer, executionName: Str
@Path("/executions")
class WorkflowExecutionsResource {

@GET
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/{wid}/latest")
@RolesAllowed(Array("REGULAR", "ADMIN"))
def retrieveLatestExecutionEntry(
@PathParam("wid") wid: Integer,
@Auth sessionUser: SessionUser
): WorkflowExecutionEntry = {

validateUserCanAccessWorkflow(sessionUser.getUser.getUid, wid)

withTransaction(context) { ctx =>
val latestEntryOpt =
ctx
.select(
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
WORKFLOW_EXECUTIONS.CUID,
USER.NAME,
USER.GOOGLE_AVATAR,
WORKFLOW_EXECUTIONS.STATUS,
WORKFLOW_EXECUTIONS.RESULT,
WORKFLOW_EXECUTIONS.STARTING_TIME,
WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME,
WORKFLOW_EXECUTIONS.BOOKMARKED,
WORKFLOW_EXECUTIONS.NAME,
WORKFLOW_EXECUTIONS.LOG_LOCATION
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.join(USER)
.on(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
.where(WORKFLOW_VERSION.WID.eq(wid))
// sort by latest VID first, then latest start-time
.orderBy(
WORKFLOW_EXECUTIONS.VID.desc(),
WORKFLOW_EXECUTIONS.EID.desc()
)
.limit(1)
.fetchInto(classOf[WorkflowExecutionEntry])
.asScala
.headOption

latestEntryOpt.getOrElse {
throw new ForbiddenException("Executions doesn't exist")
}
}
}

@GET
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/{wid}/interactions/{eid}")
Expand Down Expand Up @@ -513,19 +579,34 @@ class WorkflowExecutionsResource {
@RolesAllowed(Array("REGULAR", "ADMIN"))
def retrieveExecutionsOfWorkflow(
@PathParam("wid") wid: Integer,
@Auth sessionUser: SessionUser
@Auth sessionUser: SessionUser,
@QueryParam("status") status: String
): List[WorkflowExecutionEntry] = {
val user = sessionUser.getUser
if (!WorkflowAccessResource.hasReadAccess(wid, user.getUid)) {
List()
} else {
getWorkflowExecutions(wid, context)
val statusCodes: Set[Byte] =
Option(status)
.map(_.trim)
.filter(_.nonEmpty)
.map { raw =>
val tokens = raw.split(',').map(_.trim.toLowerCase).filter(_.nonEmpty)
try {
tokens.map(stringToAggregatedState).map(maptoStatusCode).toSet
} catch {
case e: IllegalArgumentException =>
throw new BadRequestException(e.getMessage)
}
}
.getOrElse(Set.empty[Byte])
getWorkflowExecutions(wid, context, statusCodes)
}
}

@GET
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/{wid}/{eid}")
@Path("/{wid}/stats/{eid}")
def retrieveWorkflowRuntimeStatistics(
@PathParam("wid") wid: Integer,
@PathParam("eid") eid: Integer
Expand Down Expand Up @@ -662,7 +743,8 @@ class WorkflowExecutionsResource {
case "local" =>
// CASE A: multiple operators => produce ZIP
if (request.operators.size > 1) {
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val resultExportService =
new ResultExportService(WorkflowIdentity(request.workflowId), request.computingUnitId)
val (zipStream, zipFileNameOpt) =
resultExportService.exportOperatorsAsZip(request)

Expand All @@ -687,7 +769,8 @@ class WorkflowExecutionsResource {
}
val singleOp = request.operators.head

val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val resultExportService =
new ResultExportService(WorkflowIdentity(request.workflowId), request.computingUnitId)
val (streamingOutput, fileNameOpt) =
resultExportService.exportOperatorResultAsStream(request, singleOp)

Expand All @@ -706,7 +789,8 @@ class WorkflowExecutionsResource {
.build()
case _ =>
// destination = "dataset" by default
val resultExportService = new ResultExportService(WorkflowIdentity(request.workflowId))
val resultExportService =
new ResultExportService(WorkflowIdentity(request.workflowId), request.computingUnitId)
val exportResponse =
resultExportService.exportAllOperatorsResultToDataset(user.user, request)
Response.ok(exportResponse).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ object ExecutionResultService {
*/
class ExecutionResultService(
workflowIdentity: WorkflowIdentity,
computingUnitId: Int,
val workflowStateStore: WorkflowStateStore
) extends SubscriptionManager
with LazyLogging {
Expand Down Expand Up @@ -425,7 +426,7 @@ class ExecutionResultService(
def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
// calculate from index (pageIndex starts from 1 instead of 0)
val from = request.pageSize * (request.pageIndex - 1)
val latestExecutionId = getLatestExecutionId(workflowIdentity).getOrElse(
val latestExecutionId = getLatestExecutionId(workflowIdentity, computingUnitId).getOrElse(
throw new IllegalStateException("No execution is recorded")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ object ExecutionsMetadataPersistService extends LazyLogging {
workflowId: WorkflowIdentity,
uid: Option[Integer],
executionName: String,
environmentVersion: String
environmentVersion: String,
computingUnitId: Integer
): ExecutionIdentity = {
if (!AmberConfig.isUserSystemEnabled) return DEFAULT_EXECUTION_ID
// first retrieve the latest version of this workflow
Expand All @@ -67,6 +68,10 @@ object ExecutionsMetadataPersistService extends LazyLogging {
newExecution.setUid(uid.orNull)
newExecution.setStartingTime(new Timestamp(System.currentTimeMillis()))
newExecution.setEnvironmentVersion(environmentVersion)

// Set computing unit ID if provided
newExecution.setCuid(computingUnitId)

workflowExecutionsDao.insert(newExecution)
ExecutionIdentity(newExecution.getEid.longValue())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ object ResultExportService {
.trim
}

class ResultExportService(workflowIdentity: WorkflowIdentity) {
class ResultExportService(workflowIdentity: WorkflowIdentity, computingUnitId: Int) {

import ResultExportService._

Expand Down Expand Up @@ -127,11 +127,11 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
operatorRequest: OperatorExportInfo
): (Option[String], Option[String]) = {

val execIdOpt = getLatestExecutionId(workflowIdentity)
if (execIdOpt.isEmpty)
val execIdOpt = getLatestExecutionId(workflowIdentity, computingUnitId)
if (execIdOpt.isEmpty) {
return (None, Some(s"Workflow ${request.workflowId} has no execution result"))

val operatorDocument = getOperatorDocument(operatorRequest.id)
}
val operatorDocument = getOperatorDocument(operatorRequest.id, computingUnitId)
if (operatorDocument == null || operatorDocument.getCount == 0)
return (None, Some(s"No results to export for operator $operatorRequest"))

Expand Down Expand Up @@ -162,12 +162,12 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
request: ResultExportRequest,
operatorRequest: OperatorExportInfo
): (StreamingOutput, Option[String]) = {
val execIdOpt = getLatestExecutionId(workflowIdentity)
val execIdOpt = getLatestExecutionId(workflowIdentity, computingUnitId)
if (execIdOpt.isEmpty) {
return (null, None)
}

val operatorDocument = getOperatorDocument(operatorRequest.id)
val operatorDocument = getOperatorDocument(operatorRequest.id, computingUnitId)
if (operatorDocument == null || operatorDocument.getCount == 0) {
return (null, None)
}
Expand Down Expand Up @@ -210,7 +210,7 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"))
val zipFileName = s"${request.workflowName}-$timestamp.zip"

val execIdOpt = getLatestExecutionId(workflowIdentity)
val execIdOpt = getLatestExecutionId(workflowIdentity, computingUnitId)
if (execIdOpt.isEmpty) {
throw new WebApplicationException(
s"No execution result for workflow ${request.workflowId}"
Expand All @@ -221,7 +221,7 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
override def write(outputStream: OutputStream): Unit = {
Using.resource(new ZipOutputStream(outputStream)) { zipOut =>
request.operators.foreach { op =>
val operatorDocument = getOperatorDocument(op.id)
val operatorDocument = getOperatorDocument(op.id, computingUnitId)
if (operatorDocument == null || operatorDocument.getCount == 0) {
// create an "empty" file for this operator
zipOut.putNextEntry(new ZipEntry(s"${op.id}-empty.txt"))
Expand Down Expand Up @@ -425,11 +425,14 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
* Generate the VirtualDocument for one operator's result.
* Incorporates the remote code's extra parameter `None` for sub-operator ID.
*/
private def getOperatorDocument(operatorId: String): VirtualDocument[Tuple] = {
private def getOperatorDocument(
operatorId: String,
computingUnitId: Int
): VirtualDocument[Tuple] = {
// By now the workflow should finish running
// Only supports external port 0 for now. TODO: support multiple ports
val storageUri = WorkflowExecutionsResource.getResultUriByLogicalPortId(
getLatestExecutionId(workflowIdentity).get,
getLatestExecutionId(workflowIdentity, computingUnitId).get,
OperatorIdentity(operatorId),
PortIdentity()
)
Expand Down
Loading