Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions playground/backend/internal/code_processing/code_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"context"
"fmt"
"github.com/google/uuid"
"io"
"os/exec"
"time"
)
Expand All @@ -51,7 +52,6 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
}(lc)

errorChannel := make(chan error, 1)
dataChannel := make(chan interface{}, 1)
successChannel := make(chan bool, 1)
cancelChannel := make(chan bool, 1)

Expand Down Expand Up @@ -81,19 +81,11 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
// compile
logger.Infof("%s: Compile() ...\n", pipelineId)
compileCmd := executor.Compile(ctxWithTimeout)
go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error, dataChannel chan interface{}) {
// TODO separate stderr from stdout [BEAM-13208]
data, err := cmd.CombinedOutput()
dataChannel <- data
if err != nil {
errChannel <- err
successChannel <- false
} else {
successChannel <- true
}
}(compileCmd, successChannel, errorChannel, dataChannel)
var compileError bytes.Buffer
var compileOutput bytes.Buffer
runCmdWithOutput(compileCmd, &compileOutput, &compileError, successChannel, errorChannel)

if err := processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, dataChannel, nil, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
if err := processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, &compileOutput, &compileError, errorChannel, pb.Status_STATUS_COMPILE_ERROR, pb.Status_STATUS_EXECUTING); err != nil {
return
}

Expand All @@ -110,20 +102,9 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
// run
logger.Infof("%s: Run() ...\n", pipelineId)
runCmd := executor.Run(ctxWithTimeout)

var runError bytes.Buffer
runOutput := &streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
runCmd.Stderr = &runError
runCmd.Stdout = runOutput
go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error, dataChannel chan interface{}) {
err := cmd.Run()
if err != nil {
errChannel <- err
successChannel <- false
} else {
successChannel <- true
}
}(runCmd, successChannel, errorChannel, dataChannel)
runOutput := streaming.RunOutputWriter{Ctx: ctxWithTimeout, CacheService: cacheService, PipelineId: pipelineId}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some users may want to read stderr in the frontend as well as stdout, so maybe it makes sense to write that to the cache as well.
if I remember correctly, LOG data is weitten to stderr, while only System.out.println-type data will be sent to stdout

runCmdWithOutput(runCmd, &runOutput, &runError, successChannel, errorChannel)

processStep(ctxWithTimeout, pipelineId, cacheService, cancelChannel, successChannel, nil, &runError, errorChannel, pb.Status_STATUS_RUN_ERROR, pb.Status_STATUS_FINISHED)
}
Expand Down Expand Up @@ -180,10 +161,25 @@ func GetRunOutputLastIndex(ctx context.Context, cacheService cache.Cache, key uu
return intValue, nil
}

// runCmdWithOutput runs command with keeping stdOut and stdErr
func runCmdWithOutput(cmd *exec.Cmd, stdOutput io.Writer, stdError *bytes.Buffer, successChannel chan bool, errorChannel chan error) {
cmd.Stdout = stdOutput
cmd.Stderr = stdError
go func(cmd *exec.Cmd, successChannel chan bool, errChannel chan error) {
err := cmd.Run()
if err != nil {
errChannel <- err
successChannel <- false
} else {
successChannel <- true
}
}(cmd, successChannel, errorChannel)
}

// processStep processes each executor's step with cancel and timeout checks.
// If finishes by canceling, timeout or error - returns error.
// If finishes successfully returns nil.
func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool, dataChannel chan interface{}, errorDataBuffer *bytes.Buffer, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) error {
func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.Cache, cancelChannel, successChannel chan bool, outDataBuffer, errorDataBuffer *bytes.Buffer, errorChannel chan error, errorCaseStatus, successCaseStatus pb.Status) error {
select {
case <-ctx.Done():
finishByTimeout(ctx, pipelineId, cacheService)
Expand All @@ -192,21 +188,20 @@ func processStep(ctx context.Context, pipelineId uuid.UUID, cacheService cache.C
processCancel(ctx, cacheService, pipelineId)
return fmt.Errorf("%s: code processing was canceled", pipelineId)
case ok := <-successChannel:
var data []byte = nil
if dataChannel != nil {
temp := <-dataChannel
data = temp.([]byte)
var outData []byte = nil
if outDataBuffer != nil {
outData = outDataBuffer.Bytes()
}
if !ok {
err := <-errorChannel
var errorData = data
var errorData []byte = nil
if errorDataBuffer != nil {
errorData = errorDataBuffer.Bytes()
}
processError(ctx, err, errorData, pipelineId, cacheService, errorCaseStatus)
return fmt.Errorf("%s: code processing finishes with error: %s", pipelineId, err.Error())
}
processSuccess(ctx, data, pipelineId, cacheService, successCaseStatus)
processSuccess(ctx, outData, pipelineId, cacheService, successCaseStatus)
}
return nil
}
Expand Down