Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type EventHandler struct {

ClusterTaskMap *types.ClusterTaskMap
ClusterActorMap *types.ClusterActorMap
ClusterJobMap *types.ClusterJobMap
}

var eventFilePattern = regexp.MustCompile(`-\d{4}-\d{2}-\d{2}-\d{2}$`)
Expand All @@ -44,6 +45,9 @@ func NewEventHandler(reader storage.StorageReader) *EventHandler {
ClusterActorMap: &types.ClusterActorMap{
ClusterActorMap: make(map[string]*types.ActorMap),
},
ClusterJobMap: &types.ClusterJobMap{
ClusterJobMap: make(map[string]*types.JobMap),
},
}
}

Expand Down Expand Up @@ -525,6 +529,137 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error {
// TODO: Handle actor task definition event
// This is related to GET /api/v0/tasks (type=ACTOR_TASK)
logrus.Debugf("ACTOR_TASK_DEFINITION_EVENT received, not yet implemented")

case types.DRIVER_JOB_DEFINITION_EVENT:
jobDef, ok := eventMap["driverJobDefinitionEvent"]
if !ok {
return fmt.Errorf("event does not have 'driverJobDefinitionEvent'")
}

jsonDriverJobDefinition, err := json.Marshal(jobDef)
if err != nil {
return err
}

var currJob types.Job
if err := json.Unmarshal(jsonDriverJobDefinition, &currJob); err != nil {
return err
}

jobMap := h.ClusterJobMap.GetOrCreateJobMap(currentClusterName)
jobMap.CreateOrMergeJob(currJob.JobID, func(j *types.Job) {
// Merge job by temp storing fields that the current job cannot fill in,
// and then replace the job with current job and then fill in the stored fields
existingStateTransitions := j.StateTransitions
existingStatusTransitions := j.StatusTransitions
existingStatus := j.Status
existingStartTime := j.StartTime
existingEndTime := j.EndTime
existingMessage := j.Message
existingDriverExitCode := j.DriverExitCode

*j = currJob

if len(existingStateTransitions) > 0 {
j.StateTransitions = existingStateTransitions
j.StatusTransitions = existingStatusTransitions
j.Status = existingStatus
j.StartTime = existingStartTime
j.EndTime = existingEndTime
j.Message = existingMessage
j.DriverExitCode = existingDriverExitCode
}
})
case types.DRIVER_JOB_LIFECYCLE_EVENT:
jobLifecycleEvent, ok := eventMap["driverJobLifecycleEvent"].(map[string]any)
if !ok {
return fmt.Errorf("invalid driverJobLifecycleEvent format")
}

jobId, _ := jobLifecycleEvent["jobId"].(string)
stateTransitionUnstructed, _ := jobLifecycleEvent["stateTransitions"].([]any)

if len(stateTransitionUnstructed) == 0 || jobId == "" {
return nil
}

// TODO(chiayi): Will need to convert status timeline once it is added as well
// Following fields are related to status transition:
// - status
// - message
// - errorType
// - driverExitCode
var stateTransitions []types.JobStateTransition
for _, transition := range stateTransitionUnstructed {
tr, ok := transition.(map[string]any)
if !ok {
continue
}
state, _ := tr["state"].(string)
timestampStr, _ := tr["timestamp"].(string)

var timestamp time.Time
if timestampStr != "" {
timestamp, _ = time.Parse(time.RFC3339Nano, timestampStr)
}

stateTransitions = append(stateTransitions, types.JobStateTransition{
State: types.JobState(state),
Timestamp: timestamp,
})
}

if len(stateTransitions) == 0 {
return nil
}

jobMap := h.ClusterJobMap.GetOrCreateJobMap(currentClusterName)
jobMap.CreateOrMergeJob(jobId, func(j *types.Job) {
// TODO(chiayi): take care of status (job progress) state if part of DriverJobLifecycleEvent
j.JobID = jobId

type stateTransitionKey struct {
State string
Timestamp int64
}

existingStateKeys := make(map[stateTransitionKey]bool)
for _, t := range j.StateTransitions {
existingStateKeys[stateTransitionKey{string(t.State), t.Timestamp.UnixNano()}] = true
}

for _, t := range stateTransitions {
key := stateTransitionKey{string(t.State), t.Timestamp.UnixNano()}
if !existingStateKeys[key] {
j.StateTransitions = append(j.StateTransitions, t)
existingStateKeys[key] = true
}
}

sort.Slice(j.StateTransitions, func(i, k int) bool {
return j.StateTransitions[i].Timestamp.Before(j.StateTransitions[k].Timestamp)
})

if len(j.StateTransitions) == 0 {
return
}

lastStateTransition := j.StateTransitions[len(j.StateTransitions)-1]
j.State = lastStateTransition.State

if j.StartTime.IsZero() {
for _, t := range j.StateTransitions {
if t.State == types.CREATED {
j.StartTime = t.Timestamp
break
}
}
}

if lastStateTransition.State == types.JOBFINISHED {
j.EndTime = lastStateTransition.Timestamp
}
})
default:
logrus.Infof("Event not supported, skipping: %v", eventMap)
}
Expand Down Expand Up @@ -708,3 +843,41 @@ func (h *EventHandler) GetActorsMap(clusterName string) map[string]types.Actor {
}
return actors
}

func (h *EventHandler) GetJobsMap(clusterName string) map[string]types.Job {
h.ClusterJobMap.RLock()
defer h.ClusterJobMap.RUnlock()

jobMap, ok := h.ClusterJobMap.ClusterJobMap[clusterName]
if !ok {
return map[string]types.Job{}
}

jobMap.Lock()
defer jobMap.UnLock()

jobs := make(map[string]types.Job, len(jobMap.JobMap))
for id, job := range jobMap.JobMap {
jobs[id] = job.DeepCopy()
}
return jobs
}

func (h *EventHandler) GetJobByJobID(clusterName, jobID string) (types.Job, bool) {
h.ClusterJobMap.RLock()
defer h.ClusterJobMap.Unlock()

jobMap, ok := h.ClusterJobMap.ClusterJobMap[clusterName]
if !ok {
return types.Job{}, false
}

jobMap.Lock()
defer jobMap.UnLock()

job, ok := jobMap.JobMap[jobID]
if !ok {
return types.Job{}, false
}
return job.DeepCopy(), true
}
178 changes: 178 additions & 0 deletions historyserver/pkg/eventserver/types/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package types

import (
"sync"
"time"
)

type JobType string

const (
SUBMISSION JobType = "SUBMISSION"
)

// JobStatus represents the job submission progress and execution status
type JobStatus string

const (
SUCCEEDED JobStatus = "SUCCEEDED"
PENDING JobStatus = "PENDING"
STOPPED JobStatus = "STOPPED"
// FAILED is already used in the package, using JOBFAILED
JOBFAILED JobStatus = "FAILED"
// RUNNING is already used in the package, using JOBRUNNING
JOBRUNNING JobStatus = "RUNNING"
)

type JobStatusTransition struct {
Status JobStatus
Timestamp time.Time
}

type DriverInfo struct {
ID string `json:"id"`
NodeIPAddress string `json:"nodeIpAddress"`
NodeID string `json:"nodeId"`
PID string `json:"pid"`
}

// JobState represents the literal job driver process and connectivity
type JobState string

const (
UNSPECIFIED JobState = "UNSPECIFIED"
CREATED JobState = "CREATED"
// FINISHED is already used in package, using JOBFINSIHED
JOBFINISHED JobState = "FINISHED"
)

type JobStateTransition struct {
State JobState
Timestamp time.Time
}

type Job struct {
JobID string `json:"jobId"`
SubmissionID string `json:"submissionId"`
JobType JobType `json:"type"`
Status JobStatus `json:"status"`
State JobState `json:"state"`
EntryPoint string `json:"entrypoint"`
Message string `json:"message"`
ErrorType string `json:"errorType"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
Metadata map[string]string `json:"metadata"`
RuntimeEnv map[string]string `json:"runtimeEnv"`
DriverInfo DriverInfo `json:"driverInfo"`
DriverAgentHttpAddress string `json:"driverAgentHttpAddress"`
DriverNodeID string `json:"drivernodeId"`
DriverExitCode int `json:"driverExitCode"`

// StateTransitions is the state (connectivity of the driver) job timeline
StateTransitions []JobStateTransition

// StatusTransitions is the status (progress) of the job timeline
StatusTransitions []JobStatusTransition
}

type JobMap struct {
JobMap map[string]Job
Mu sync.Mutex
}

func (j *JobMap) Lock() {
j.Mu.Lock()
}

func (j *JobMap) UnLock() {
j.Mu.Unlock()
}

func NewJobMap() *JobMap {
return &JobMap{
JobMap: make(map[string]Job),
}
}

type ClusterJobMap struct {
ClusterJobMap map[string]*JobMap
Mu sync.RWMutex
}

func (c *ClusterJobMap) RLock() {
c.Mu.RLock()
}

func (c *ClusterJobMap) RUnlock() {
c.Mu.RUnlock()
}

func (c *ClusterJobMap) Lock() {
c.Mu.Lock()
}

func (c *ClusterJobMap) Unlock() {
c.Mu.Unlock()
}

func (c *ClusterJobMap) GetOrCreateJobMap(clusterName string) *JobMap {
c.Lock()
defer c.Unlock()

jobMap, exists := c.ClusterJobMap[clusterName]
if !exists {
jobMap = NewJobMap()
c.ClusterJobMap[clusterName] = jobMap
}

return jobMap
}

// CreateOrMergeJob will find or create the job and runs the merge function.
// There are two fields that is part of the Job object that can transition.
// The State and Status, one representing the driver lifecycle and another
// represents the status of the progress of the job.
func (j *JobMap) CreateOrMergeJob(jobId string, mergeFn func(*Job)) {
j.Lock()
defer j.UnLock()

job, exist := j.JobMap[jobId]
if !exist {
// Job does not exist, creating new with JobID
newJob := Job{JobID: jobId}
mergeFn(&newJob)
j.JobMap[jobId] = newJob
return
}

mergeFn(&job)
j.JobMap[jobId] = job
}

// DeepCopy will return a deep copy of the Job
func (j Job) DeepCopy() Job {
cp := j
if len(j.StateTransitions) > 0 {
cp.StateTransitions = make([]JobStateTransition, len(j.StateTransitions))
copy(cp.StateTransitions, j.StateTransitions)
}
if len(j.StatusTransitions) > 0 {
cp.StatusTransitions = make([]JobStatusTransition, len(j.StatusTransitions))
copy(cp.StatusTransitions, j.StatusTransitions)
}
if len(j.Metadata) > 0 {
cp.Metadata = make(map[string]string, len(j.Metadata))
for k, v := range j.Metadata {
cp.Metadata[k] = v
}
}
if len(j.RuntimeEnv) > 0 {
cp.RuntimeEnv = make(map[string]string, len(j.RuntimeEnv))
for k, v := range j.RuntimeEnv {
cp.RuntimeEnv[k] = v
}
}

return cp
}
Loading
Loading