Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions app/mainapp/code_editor/runcode/run_code_file_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ func RunCodeFile(workerGroup string, fileID string, envID string, pipelineID str
return models.CodeRun{}, errors.New(rerror)
}

parentfolderdata := ""
parentfolderdata := envID + "/coderun/" + pipelineID + "/" + nodeID
var err error
if filesdata.FolderID != "" {
parentfolderdata, err = filesystem.FolderConstructByID(database.DBConn, filesdata.FolderID, envID, "pipelines")
if err != nil {
return models.CodeRun{}, errors.New("File record not found")
}
} else {
return models.CodeRun{}, errors.New("File record not found")
}
// if filesdata.FolderID != "" {

// // The folder structure will look like <environment ID>/coderun/<pipeline ID>/<node ID>
// parentfolderdata, err = filesystem.NodeLevelFolderConstructByID(database.DBConn, filesdata.FolderID, envID)
// // parentfolderdata, err = filesystem.FolderConstructByID(database.DBConn, filesdata.FolderID, envID, "pipelines")

// log.Println("parent folder code run:", parentfolderdata)

// if err != nil {
// return models.CodeRun{}, errors.New("File record not found")
// }
// } else {
// return models.CodeRun{}, errors.New("File record not found")
// }

commands = append(commands, "python3 -u ${{nodedirectory}}"+filesdata.FileName)
runSend, err = RunCodeServerWorker(envID, nodeID, workerGroup, runid, commands, filesdata, parentfolderdata, filesdata.FolderID, replayRunID)
Expand Down
1 change: 1 addition & 0 deletions app/mainapp/database/models/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ type CodeFilesCacheOutput struct {
FolderID string `json:"folder_id"`
FileName string `json:"file_name"`
ChecksumMD5 string `json:"checksum_md5"`
Level string `json:"level"`
FileStore []byte `gorm:"type:bytea;" json:"file_store"`
}

Expand Down
52 changes: 26 additions & 26 deletions app/mainapp/database/models/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@ func (WorkerTasks) TableName() string {
}

type WorkerTasks struct {
TaskID string `gorm:"PRIMARY_KEY;type:varchar(48);" json:"task_id"`
CreatedAt time.Time `json:"created_at"`
EnvironmentID string `json:"environment_id"`
RunID string `gorm:"index:idx_task_runid;index:idx_task_nodeid;" json:"run_id"`
RunType string `json:"run_type"`
WorkerGroup string `json:"worker_group"`
WorkerID string `json:"worker_id"`
WorkerType string `json:"worker_type"`
PipelineID string `json:"pipeline_id"`
NodeID string `gorm:"index:idx_task_nodeid;" json:"node_id"`
Folder string `json:"folder"`
FolderID string `json:"folder_id"`
Dependency datatypes.JSON `json:"dependency"`
Destination datatypes.JSON `json:"destination"`
StartDT time.Time `json:"start_dt"`
EndDT time.Time `json:"end_dt"`
Status string `json:"status"`
Reason string `json:"reason"`
Commands datatypes.JSON `json:"commands"`
Version string `json:"version"`
TaskID string `gorm:"PRIMARY_KEY;type:varchar(48);" json:"task_id"`
CreatedAt time.Time `json:"created_at"`
EnvironmentID string `json:"environment_id"`
RunID string `gorm:"index:idx_task_runid;index:idx_task_nodeid;" json:"run_id"`
RunType string `json:"run_type"`
WorkerGroup string `json:"worker_group"`
WorkerID string `json:"worker_id"`
WorkerType string `json:"worker_type"`
PipelineID string `json:"pipeline_id"`
NodeID string `gorm:"index:idx_task_nodeid;" json:"node_id"`
// Folder string `json:"folder"`
// FolderID string `json:"folder_id"`
Dependency datatypes.JSON `json:"dependency"`
Destination datatypes.JSON `json:"destination"`
StartDT time.Time `json:"start_dt"`
EndDT time.Time `json:"end_dt"`
Status string `json:"status"`
Reason string `json:"reason"`
Commands datatypes.JSON `json:"commands"`
Version string `json:"version"`
}

func (WorkerTaskLock) IsEntity() {}
Expand All @@ -61,12 +61,12 @@ type WorkerTaskSend struct {
RunID string `json:"run_id"`
WorkerGroup string `json:"worker_group"`
WorkerID string `json:"worker_id"`
Folder string `json:"folder"`
FolderID string `json:"folder_id"`
Commands []string `json:"commands"`
Version string `json:"version"`
RunType string `json:"run_type"`
InputData bool `json:"input_data"`
// Folder string `json:"folder"`
// FolderID string `json:"folder_id"`
Commands []string `json:"commands"`
Version string `json:"version"`
RunType string `json:"run_type"`
InputData bool `json:"input_data"`
}

type WorkerPipelineNext struct {
Expand Down
6 changes: 2 additions & 4 deletions app/mainapp/pipelines/rundeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func RunDeployment(pipelineID string, environmentID string, runID string, versio
foldersdata = <-folders
parentfolderdata := <-parentfolder

// log.Println("parent folder", parentfolderdata)
log.Println("parent folder", parentfolderdata)

// Map children
for _, s := range edgesdata {
Expand Down Expand Up @@ -218,8 +218,6 @@ func RunDeployment(pipelineID string, environmentID string, runID string, versio
Dependency: dependJSON,
Commands: s.Commands,
Destination: destinationJSON,
Folder: folderMap[s.NodeID],
FolderID: folderNodeMap[s.NodeID],
RunType: "deployment",
Version: s.Version,
}
Expand Down Expand Up @@ -289,7 +287,7 @@ func RunDeployment(pipelineID string, environmentID string, runID string, versio
// }
// err = worker.WorkerRunTask("python_1", triggerData[s].TaskID, RunID, environmentID, pipelineID, s, []string{"sleep " + strconv.Itoa(x) + "; echo " + s})

err = worker.WorkerRunTask(triggerData[s].WorkerGroup, triggerData[s].TaskID, RunID, environmentID, pipelineID, s, commandsend, folderMap[triggerData[s].NodeID], folderNodeMap[triggerData[s].NodeID], triggerData[s].Version, "deployment", triggerData[s].WorkerType, inputData)
err = worker.WorkerRunTask(triggerData[s].WorkerGroup, triggerData[s].TaskID, RunID, environmentID, pipelineID, s, commandsend, triggerData[s].Version, "deployment", triggerData[s].WorkerType, inputData)
if err != nil {
if dpconfig.Debug == "true" {
logging.PrintSecretsRedact(err)
Expand Down
2 changes: 1 addition & 1 deletion app/mainapp/pipelines/runnext_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func RunNext(msg models.WorkerTaskSend) {
}

// ------ run the destination -------
err = worker.WorkerRunTask(s.WorkerGroup, s.TaskID, s.RunID, s.EnvironmentID, s.PipelineID, s.NodeID, commandsend, s.Folder, s.FolderID, s.Version, s.RunType, s.WorkerType, msg.InputData)
err = worker.WorkerRunTask(s.WorkerGroup, s.TaskID, s.RunID, s.EnvironmentID, s.PipelineID, s.NodeID, commandsend, s.Version, s.RunType, s.WorkerType, msg.InputData)
// err = worker.WorkerRunTask("python_1", triggerData[s].TaskID, RunID, environmentID, pipelineID, s, []string{"echo " + s})
if err != nil {
if dpconfig.Debug == "true" {
Expand Down
154 changes: 63 additions & 91 deletions app/mainapp/pipelines/runpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package pipelines
import (
"encoding/json"
"log"
"os"
"time"

"github.com/dataplane-app/dataplane/app/mainapp/code_editor/filesystem"
dpconfig "github.com/dataplane-app/dataplane/app/mainapp/config"
"github.com/dataplane-app/dataplane/app/mainapp/database"
"github.com/dataplane-app/dataplane/app/mainapp/database/models"
Expand All @@ -26,8 +24,8 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
// start := time.Now().UTC()

// Doesnt require concurrency safety, should be written / read in sequence.
var destinations = make(map[string][]string)
var dependencies = make(map[string][]string)
// var destinations = make(map[string][]string)
// var dependencies = make(map[string][]string)
var triggerData = make(map[string]*models.WorkerTasks)

// Retrieve pipeline details
Expand All @@ -44,29 +42,6 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
// Input data from trigger
var inputData bool = false

// Check if a runJson is submitted
// if runJson != nil && len(runJson[0]) != 0 {

// inputData = true
// run := models.PipelineApiTriggerRuns{
// RunID: runID,
// PipelineID: pipelineID,
// EnvironmentID: environmentID,
// RunType: "pipeline",
// RunJSON: runJson[0],
// CreatedAt: time.Now().UTC(),
// }

// err = database.DBConn.Create(&run).Error
// if err != nil {

// if dpconfig.Debug == "true" {
// logging.PrintSecretsRedact(err)
// }
// return models.PipelineRuns{}, err
// }
// }

// Create a run
run := models.PipelineRuns{
RunID: runID,
Expand All @@ -89,21 +64,21 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
}

// ------ Obtain folders
folders := make(chan []models.CodeFolders)
parentfolder := make(chan string)
foldersdata := []models.CodeFolders{}
// folders := make(chan []models.CodeFolders)
// // parentfolder := make(chan string)
// foldersdata := []models.CodeFolders{}

go func() {
database.DBConn.Where("pipeline_id = ? and environment_id =? and level = ?", pipelineID, environmentID, "node").Find(&foldersdata)
folders <- foldersdata
// go func() {
// database.DBConn.Where("pipeline_id = ? and environment_id =? and level = ?", pipelineID, environmentID, "node").Find(&foldersdata)
// folders <- foldersdata

pf := ""
// // pf := ""

if len(foldersdata) > 0 {
pf, _ = filesystem.FolderConstructByID(database.DBConn, foldersdata[0].ParentID, environmentID, "pipelines")
}
parentfolder <- pf
}()
// // if len(foldersdata) > 0 {
// // pf, _ = filesystem.FolderConstructByID(database.DBConn, foldersdata[0].ParentID, environmentID, "pipelines")
// // }
// // parentfolder <- pf
// }()

// Chart a course
nodes := make(chan []*models.PipelineNodes)
Expand All @@ -115,12 +90,12 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
nodes <- nodesdata
}()

edges := make(chan []*models.PipelineEdges)
edgesdata := []*models.PipelineEdges{}
go func() {
database.DBConn.Where("pipeline_id = ? and environment_id =?", pipelineID, environmentID).Find(&edgesdata)
edges <- edgesdata
}()
// edges := make(chan []*models.PipelineEdges)
// edgesdata := []*models.PipelineEdges{}
// go func() {
// database.DBConn.Select("from", "to").Where("pipeline_id = ? and environment_id =?", pipelineID, environmentID).Find(&edgesdata)
// edges <- edgesdata
// }()

// Start at trigger
RunID := run.RunID
Expand All @@ -129,42 +104,48 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.

// Return go routines
nodesdata = <-nodes
edgesdata = <-edges
foldersdata = <-folders
parentfolderdata := <-parentfolder
// edgesdata = <-edges
// foldersdata = <-folders
// parentfolderdata := environmentID + "/pipelines/" + pipelineID
// <-parentfolder

// The folder structure will look like <environment ID>/pipelines/<pipeline ID>
// log.Println("parent folder", parentfolderdata)

// Map children
for _, s := range edgesdata {
// for _, s := range edgesdata {

destinations[s.From] = append(destinations[s.From], s.To)
dependencies[s.To] = append(dependencies[s.To], s.From)
// destinations[s.From] = append(destinations[s.From], s.To)
// dependencies[s.To] = append(dependencies[s.To], s.From)

}
// }

// Map folder structure:
var folderMap = make(map[string]string)
var folderNodeMap = make(map[string]string)
for _, f := range foldersdata {
// var folderMap = make(map[string]string)
// var folderNodeMap = make(map[string]string)
// for _, f := range foldersdata {

if f.Level == "node" {
// if f.Level == "node" {

dir := parentfolderdata + f.FolderID + "_" + f.FolderName
// log.Println(dir)
// dir := parentfolderdata + f.NodeID
// // + "/" + f.FolderID
// // + "_" + f.FolderName
// // log.Println("Dir:", dir)

folderMap[f.NodeID] = dir
folderNodeMap[f.NodeID] = f.FolderID
if dpconfig.Debug == "true" {
if _, err := os.Stat(dpconfig.CodeDirectory + dir); os.IsExist(err) {
log.Println("Dir exists:", dpconfig.CodeDirectory+dir)
// folderMap[f.NodeID] = dir

}
}
// // ---- The Folder ID is needed to see the root of the folder structure, otherwise it must recursively go through the folder leves that are not root.
// folderNodeMap[f.NodeID] = f.FolderID
// if dpconfig.Debug == "true" {
// if _, err := os.Stat(dpconfig.CodeDirectory + dir); os.IsExist(err) {
// log.Println("Dir exists:", dpconfig.CodeDirectory+dir)

}
// }
// }

}
// }

// }

var course []*models.WorkerTasks
var trigger []string
Expand Down Expand Up @@ -206,15 +187,15 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
triggerID = s.NodeID
}

dependJSON, err := json.Marshal(dependencies[s.NodeID])
if err != nil {
logging.PrintSecretsRedact(err)
}
// dependJSON, err := json.Marshal(dependencies[s.NodeID])
// if err != nil {
// logging.PrintSecretsRedact(err)
// }

destinationJSON, err := json.Marshal(destinations[s.NodeID])
if err != nil {
logging.PrintSecretsRedact(err)
}
// destinationJSON, err := json.Marshal(destinations[s.NodeID])
// if err != nil {
// logging.PrintSecretsRedact(err)
// }

addTask := &models.WorkerTasks{
TaskID: uuid.NewString(),
Expand All @@ -226,12 +207,12 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
PipelineID: s.PipelineID,
NodeID: s.NodeID,
Status: status,
Dependency: dependJSON,
Dependency: s.Dependency,
Commands: s.Commands,
Destination: destinationJSON,
Folder: folderMap[s.NodeID],
FolderID: folderNodeMap[s.NodeID],
RunType: "pipeline",
Destination: s.Destination,
// Folder: folderMap[s.NodeID],
// FolderID: folderNodeMap[s.NodeID],
RunType: "pipeline",
}

if nodeType == "start" {
Expand All @@ -242,6 +223,7 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.

}

// This is to queue the tasks for run - sends Queue status to frontend
errnat := messageq.MsgSend("taskupdate."+environmentID+"."+RunID, addTask)
if errnat != nil {
if dpconfig.Debug == "true" {
Expand Down Expand Up @@ -293,18 +275,8 @@ func RunPipeline(pipelineID string, environmentID string, runID string) (models.
commandsend = append(commandsend, c.Command)
}

// log.Println("Commands:", commandsend)

// log.Println("Pipeline run id:", runID, RunID)

// log.Println("First:", s)
// if x == 2 {
// ex = "exit 1;"
// }
// err = worker.WorkerRunTask("python_1", triggerData[s].TaskID, RunID, environmentID, pipelineID, s, []string{"sleep " + strconv.Itoa(x) + "; echo " + s})
// log.Println("Worker type:", triggerData[s].WorkerType)
/* Start the first task */
err = worker.WorkerRunTask(triggerData[s].WorkerGroup, triggerData[s].TaskID, RunID, environmentID, pipelineID, s, commandsend, folderMap[triggerData[s].NodeID], folderNodeMap[triggerData[s].NodeID], "", "pipeline", triggerData[s].WorkerType, inputData)
err = worker.WorkerRunTask(triggerData[s].WorkerGroup, triggerData[s].TaskID, RunID, environmentID, pipelineID, s, commandsend, "", "pipeline", triggerData[s].WorkerType, inputData)
if err != nil {
if dpconfig.Debug == "true" {
logging.PrintSecretsRedact(err)
Expand Down
Loading