Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
public class RunScriptExecutor implements RunnableTask<RunScript> {

private Optional<WorkflowValueResolver<Map<String, Object>>> environmentExpr;

private Optional<WorkflowValueResolver<Map<String, Object>>> argumentExpr;

private WorkflowValueResolver<String> codeSupplier;
private boolean isAwait;
private RunTaskConfiguration.ProcessReturnType returnType;
private Optional<RunTaskConfiguration.ProcessReturnType> returnType;
private ScriptRunner taskRunner;

@Override
Expand All @@ -65,7 +63,7 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) {

this.isAwait = taskConfiguration.isAwait();

this.returnType = taskConfiguration.getReturn();
this.returnType = Optional.ofNullable(taskConfiguration.getReturn());

WorkflowApplication application = definition.application();
this.environmentExpr =
Expand Down Expand Up @@ -102,22 +100,24 @@ public void init(RunScript taskConfiguration, WorkflowDefinition definition) {
@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
return CompletableFuture.supplyAsync(
() ->
taskRunner.runScript(
new ScriptContext(
argumentExpr
.map(m -> m.apply(workflowContext, taskContext, input))
.orElse(Map.of()),
environmentExpr
.map(m -> m.apply(workflowContext, taskContext, input))
.orElse(Map.of()),
codeSupplier.apply(workflowContext, taskContext, input),
isAwait,
returnType),
workflowContext,
taskContext,
input));
ScriptContext scriptContext =
new ScriptContext(
argumentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()),
environmentExpr.map(m -> m.apply(workflowContext, taskContext, input)).orElse(Map.of()),
codeSupplier.apply(workflowContext, taskContext, input),
returnType);
if (isAwait) {
return CompletableFuture.supplyAsync(
() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input),
workflowContext.definition().application().executorService());
} else {
workflowContext
.definition()
.application()
.executorService()
.submit(() -> taskRunner.runScript(scriptContext, workflowContext, taskContext, input));
return CompletableFuture.completedFuture(input);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,141 +17,142 @@

import io.serverlessworkflow.api.types.RunShell;
import io.serverlessworkflow.api.types.RunTaskConfiguration;
import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType;
import io.serverlessworkflow.api.types.Shell;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class RunShellExecutor implements RunnableTask<RunShell> {

private ShellResultSupplier shellResultSupplier;
private ProcessBuilderSupplier processBuilderSupplier;
private WorkflowValueResolver<String> shellCommand;
private Map<WorkflowValueResolver<String>, Optional<WorkflowValueResolver<String>>>
shellArguments;
private Optional<WorkflowValueResolver<Map<String, Object>>> shellEnv;
private Optional<ProcessReturnType> returnType;

@FunctionalInterface
private interface ShellResultSupplier {
WorkflowModel apply(
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
}
@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
StringBuilder commandBuilder =
new StringBuilder(shellCommand.apply(workflowContext, taskContext, model));
for (var entry : shellArguments.entrySet()) {
commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model));
entry
.getValue()
.ifPresent(
v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model)));
}

@FunctionalInterface
private interface ProcessBuilderSupplier {
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
shellEnv.ifPresent(
map -> {
for (Map.Entry<String, Object> entry :
map.apply(workflowContext, taskContext, model).entrySet()) {
builder.environment().put(entry.getKey(), (String) entry.getValue());
}
});

return returnType
.map(
type ->
CompletableFuture.supplyAsync(
() ->
buildResultFromProcess(
workflowContext.definition().application().modelFactory(),
uncheckedStart(builder),
type)
.orElse(model),
workflowContext.definition().application().executorService()))
.orElseGet(
() -> {
workflowContext
.definition()
.application()
.executorService()
.submit(() -> uncheckedStart(builder));
return CompletableFuture.completedFuture(model);
});
}

@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
return CompletableFuture.supplyAsync(
() -> this.shellResultSupplier.apply(taskContext, input, processBuilder));
private Process uncheckedStart(ProcessBuilder builder) {
try {
return builder.start();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
Shell shell = taskConfiguration.getShell();
final String shellCommand = shell.getCommand();

if (shellCommand == null || shellCommand.isBlank()) {
if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) {
throw new IllegalStateException("Missing shell command in RunShell task configuration");
}
this.processBuilderSupplier =
(workflowContext, taskContext) -> {
WorkflowApplication application = definition.application();

StringBuilder commandBuilder =
new StringBuilder(
ExpressionUtils.isExpr(shellCommand)
? WorkflowUtils.buildStringFilter(application, shellCommand)
.apply(workflowContext, taskContext, taskContext.input())
: shellCommand);

if (shell.getArguments() != null
&& shell.getArguments().getAdditionalProperties() != null) {
for (Map.Entry<String, Object> entry :
shell.getArguments().getAdditionalProperties().entrySet()) {
commandBuilder
.append(" ")
.append(
ExpressionUtils.isExpr(entry.getKey())
? WorkflowUtils.buildStringFilter(application, entry.getKey())
.apply(workflowContext, taskContext, taskContext.input())
: entry.getKey());
if (entry.getValue() != null) {

commandBuilder
.append("=")
.append(
ExpressionUtils.isExpr(entry.getValue())
? WorkflowUtils.buildStringFilter(
application, entry.getValue().toString())
.apply(workflowContext, taskContext, taskContext.input())
: entry.getValue().toString());
}
}
}

// TODO: support Windows cmd.exe
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
if (shell.getEnvironment() != null
&& shell.getEnvironment().getAdditionalProperties() != null) {
for (Map.Entry<String, Object> entry :
shell.getEnvironment().getAdditionalProperties().entrySet()) {
String value =
ExpressionUtils.isExpr(entry.getValue())
? WorkflowUtils.buildStringFilter(application, entry.getValue().toString())
.apply(workflowContext, taskContext, taskContext.input())
: entry.getValue().toString();

// configure environments
builder.environment().put(entry.getKey(), value);
}
}
return builder;
};

this.shellResultSupplier =
(taskContext, input, processBuilder) -> {
try {
Process process = processBuilder.start();
return taskConfiguration.isAwait()
? buildResultFromProcess(taskConfiguration, definition, process)
: input;
} catch (IOException | InterruptedException e) {
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
}
};
shellCommand =
WorkflowUtils.buildStringFilter(
definition.application(), taskConfiguration.getShell().getCommand());

shellArguments =
shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null
? shell.getArguments().getAdditionalProperties().entrySet().stream()
.collect(
Collectors.toMap(
e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()),
e ->
e.getValue() != null
? Optional.of(
WorkflowUtils.buildStringFilter(
definition.application(), e.getValue().toString()))
: Optional.empty(),
(x, y) -> y,
LinkedHashMap::new))
: Map.of();

shellEnv =
shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null
? Optional.of(
WorkflowUtils.buildMapResolver(
definition.application(), shell.getEnvironment().getAdditionalProperties()))
: Optional.empty();

returnType =
taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty();
}

/**
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
* and captures the exit code, stdout, and stderr based on the task configuration.
*/
private WorkflowModel buildResultFromProcess(
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
throws IOException, InterruptedException {
int exitCode = process.waitFor();
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);

WorkflowModelFactory modelFactory = definition.application().modelFactory();
return switch (taskConfiguration.getReturn()) {
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
case NONE -> modelFactory.fromNull();
case CODE -> modelFactory.from(exitCode);
case STDOUT -> modelFactory.from(stdout.trim());
case STDERR -> modelFactory.from(stderr.trim());
};
private static Optional<WorkflowModel> buildResultFromProcess(
WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) {
try {
int exitCode = process.waitFor();
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
return Optional.of(
switch (type) {
case ALL ->
modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
case NONE -> modelFactory.fromNull();
case CODE -> modelFactory.from(exitCode);
case STDOUT -> modelFactory.from(stdout.trim());
case STDERR -> modelFactory.from(stderr.trim());
});
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Optional.empty();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

import io.serverlessworkflow.api.types.RunTaskConfiguration;
import java.util.Map;
import java.util.Optional;

public record ScriptContext(
Map<String, Object> args,
Map<String, Object> envs,
String code,
boolean isAwait,
RunTaskConfiguration.ProcessReturnType returnType) {}
Optional<RunTaskConfiguration.ProcessReturnType> returnType) {}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public WorkflowModel runScript(
WorkflowApplication application = workflowContext.definition().application();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
ByteArrayOutputStream stdout = new ByteArrayOutputStream();

try (Context ctx =
Context.newBuilder()
.err(stderr)
Expand All @@ -71,29 +70,26 @@ public WorkflowModel runScript(
(key, val) -> {
ctx.getBindings(identifier().getLang()).putMember(key, val);
});

configureProcessEnv(ctx, script.envs());

if (!script.isAwait()) {
application
.executorService()
.submit(
() -> {
ctx.eval(identifier().getLang(), script.code());
});
return application.modelFactory().fromAny(input);
}

ctx.eval(Source.create(identifier().getLang(), script.code()));

return modelFromOutput(
script.returnType(), application.modelFactory(), stdout, () -> stderr.toString());
return script
.returnType()
.map(
type ->
modelFromOutput(
type, application.modelFactory(), stdout, () -> stderr.toString()))
.orElse(input);
} catch (PolyglotException e) {
if (e.getExitStatus() != 0 || e.isSyntaxError()) {
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build());
} else {
return modelFromOutput(
script.returnType(), application.modelFactory(), stdout, () -> buildStderr(e, stderr));
return script
.returnType()
.map(
type ->
modelFromOutput(
type, application.modelFactory(), stdout, () -> buildStderr(e, stderr)))
.orElse(input);
}
}
}
Expand Down
Loading